Skip to content

Commit

Permalink
fix: xds translation return early, should be done in a best-effort wa…
Browse files Browse the repository at this point in the history
…y instead

Signed-off-by: huabing zhao <zhaohuabing@gmail.com>
  • Loading branch information
zhaohuabing committed Nov 17, 2023
1 parent 1789d98 commit 7fd051c
Showing 1 changed file with 57 additions and 33 deletions.
90 changes: 57 additions & 33 deletions internal/xds/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,50 +64,68 @@ func (t *Translator) Translate(ir *ir.Xds) (*types.ResourceVersionTable, error)

tCtx := new(types.ResourceVersionTable)

if err := t.processHTTPListenerXdsTranslation(tCtx, ir.HTTP, ir.AccessLog, ir.Tracing, ir.Metrics); err != nil {
return nil, err
// xDS translation is done in a best-effort manner, so we collect all errors
// and return them at the end.
var errs error
if err := t.processHTTPListenerXdsTranslation(
tCtx, ir.HTTP, ir.AccessLog, ir.Tracing, ir.Metrics); err != nil {
errs = multierror.Append(errs, err)
}

if err := processTCPListenerXdsTranslation(tCtx, ir.TCP, ir.AccessLog); err != nil {
return nil, err
errs = multierror.Append(errs, err)
}

if err := processUDPListenerXdsTranslation(tCtx, ir.UDP, ir.AccessLog); err != nil {
return nil, err
errs = multierror.Append(errs, err)
}

if err := processJSONPatches(tCtx, ir.EnvoyPatchPolicies); err != nil {
return nil, err
errs = multierror.Append(errs, err)
}

if err := processClusterForAccessLog(tCtx, ir.AccessLog); err != nil {
return nil, err
errs = multierror.Append(errs, err)
}
if err := processClusterForTracing(tCtx, ir.Tracing); err != nil {
return nil, err
errs = multierror.Append(errs, err)
}

// Check if an extension want to inject any clusters/secrets
// If no extension exists (or it doesn't subscribe to this hook) then this is a quick no-op
if err := processExtensionPostTranslationHook(tCtx, t.ExtensionManager); err != nil {
return nil, err
errs = multierror.Append(errs, err)
}

return tCtx, nil
return tCtx, errs
}

func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersionTable, httpListeners []*ir.HTTPListener,
accesslog *ir.AccessLog, tracing *ir.Tracing, metrics *ir.Metrics) error {
func (t *Translator) processHTTPListenerXdsTranslation(
tCtx *types.ResourceVersionTable,
httpListeners []*ir.HTTPListener,
accessLog *ir.AccessLog,
tracing *ir.Tracing,
metrics *ir.Metrics,
) error {
// The XDS translation is done in a best-effort manner, so we collect all
// errors and return them at the end.
var errs error
for _, httpListener := range httpListeners {
addFilterChain := true
var xdsRouteCfg *routev3.RouteConfiguration

// Search for an existing listener, if it does not exist, create one.
xdsListener := findXdsListenerByHostPort(tCtx, httpListener.Address, httpListener.Port, corev3.SocketAddress_TCP)
xdsListener := findXdsListenerByHostPort(
tCtx, httpListener.Address, httpListener.Port, corev3.SocketAddress_TCP)
if xdsListener == nil {
xdsListener = buildXdsTCPListener(httpListener.Name, httpListener.Address, httpListener.Port, httpListener.TCPKeepalive, accesslog)
xdsListener = buildXdsTCPListener(
httpListener.Name, httpListener.Address, httpListener.Port,
httpListener.TCPKeepalive, accessLog)
if err := tCtx.AddXdsResource(resourcev3.ListenerType, xdsListener); err != nil {
return err
// skip this listener if failed to add xds listener to the
// resource version table. Normally, this should not happen.
errs = multierror.Append(errs, err)
continue
}
} else if httpListener.TLS == nil {
// Find the route config associated with this listener that
Expand All @@ -120,14 +138,16 @@ func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersi
addFilterChain = false
xdsRouteCfg = findXdsRouteConfig(tCtx, routeName)
if xdsRouteCfg == nil {
return errors.New("unable to find xds route config")
// skip this listener if failed to find xds route config
errs = multierror.Append(errs, errors.New("unable to find xds route config"))
continue
}
}
}

if addFilterChain {
if err := t.addXdsHTTPFilterChain(xdsListener, httpListener, accesslog, tracing); err != nil {
return err
if err := t.addXdsHTTPFilterChain(xdsListener, httpListener, accessLog, tracing); err != nil {
errs = multierror.Append(errs, err)
}
}

Expand All @@ -139,7 +159,7 @@ func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersi
}

if err := tCtx.AddXdsResource(resourcev3.RouteType, xdsRouteCfg); err != nil {
return err
errs = multierror.Append(errs, err)
}
}

Expand All @@ -148,7 +168,7 @@ func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersi
for t := range httpListener.TLS {
secret := buildXdsDownstreamTLSSecret(httpListener.TLS[t])
if err := tCtx.AddXdsResource(resourcev3.SecretType, secret); err != nil {
return err
errs = multierror.Append(errs, err)
}
}
}
Expand Down Expand Up @@ -197,26 +217,30 @@ func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersi
// 1:1 between IR HTTPRoute and xDS config.route.v3.Route
xdsRoute, err := buildXdsRoute(httpRoute)
if err != nil {
return err
// skip this route if failed to build xds route
errs = multierror.Append(errs, err)
continue
}

// Check if an extension want to modify the route we just generated
// If no extension exists (or it doesn't subscribe to this hook) then this is a quick no-op.
if err := processExtensionPostRouteHook(xdsRoute, vHost, httpRoute, t.ExtensionManager); err != nil {
return err
if err = processExtensionPostRouteHook(xdsRoute, vHost, httpRoute, t.ExtensionManager); err != nil {
if err != nil {
errs = multierror.Append(errs, err)
}
}

vHost.Routes = append(vHost.Routes, xdsRoute)

if httpRoute.Destination != nil {
if err := addXdsCluster(tCtx, &xdsClusterArgs{
if err = addXdsCluster(tCtx, &xdsClusterArgs{
name: httpRoute.Destination.Name,
settings: httpRoute.Destination.Settings,
tSocket: nil,
endpointType: EndpointTypeStatic,
loadBalancer: httpRoute.LoadBalancer,
}); err != nil && !errors.Is(err, ErrXdsClusterExists) {
return err
errs = multierror.Append(errs, err)
}
}

Expand All @@ -228,7 +252,7 @@ func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersi
tSocket: nil,
endpointType: EndpointTypeStatic,
}); err != nil && !errors.Is(err, ErrXdsClusterExists) {
return err
errs = multierror.Append(errs, err)
}
}
}
Expand All @@ -238,46 +262,46 @@ func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersi
// Check if an extension want to modify the Virtual Host we just generated
// If no extension exists (or it doesn't subscribe to this hook) then this is a quick no-op.
if err := processExtensionPostVHostHook(vHost, t.ExtensionManager); err != nil {
return err
errs = multierror.Append(errs, err)
}
}
xdsRouteCfg.VirtualHosts = append(xdsRouteCfg.VirtualHosts, vHostsList...)

// Add per-route filter configs to the route config.
if err := patchRouteCfgWithPerRouteConfig(xdsRouteCfg, httpListener); err != nil {
return err
errs = multierror.Append(errs, err)
}

// TODO: Make this into a generic interface for API Gateway features.
// https://github.com/envoyproxy/gateway/issues/882
// Check if a ratelimit cluster exists, if not, add it, if its needed.
if err := t.createRateLimitServiceCluster(tCtx, httpListener); err != nil {
return err
errs = multierror.Append(errs, err)
}

// Create authn jwks clusters, if needed.
if err := createJWKSClusters(tCtx, httpListener.Routes); err != nil {
return err
errs = multierror.Append(errs, err)
}

// Create oauth2 token endpoint clusters, if needed.
if err := createOAuth2TokenEndpointClusters(tCtx, httpListener.Routes); err != nil {
return err
errs = multierror.Append(errs, err)
}

// Create oauth2 client and HMAC secrets, if needed.
if err := createOAuth2Secrets(tCtx, httpListener.Routes); err != nil {
return err
errs = multierror.Append(errs, err)
}

// Check if an extension want to modify the listener that was just configured/created
// If no extension exists (or it doesn't subscribe to this hook) then this is a quick no-op
if err := processExtensionPostListenerHook(tCtx, xdsListener, t.ExtensionManager); err != nil {
return err
errs = multierror.Append(errs, err)
}
}

return nil
return errs
}

func processTCPListenerXdsTranslation(tCtx *types.ResourceVersionTable, tcpListeners []*ir.TCPListener, accesslog *ir.AccessLog) error {
Expand Down

0 comments on commit 7fd051c

Please sign in to comment.