Skip to content

Commit

Permalink
fix: ensure the xds grpc server is properly stopped (#1860)
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnh2 authored Sep 12, 2023
1 parent 2ffd67b commit 98cb3a8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
15 changes: 9 additions & 6 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *Runner) Start(ctx context.Context) error {
discoveryv3.RegisterAggregatedDiscoveryServiceServer(r.grpc, serverv3.NewServer(ctx, r.cache, cb))

// Start and listen xDS gRPC config Server.
go r.serverXdsConfigServer(ctx)
go r.serveXdsConfigServer(ctx)

// Start message Subscription.
go r.subscribeAndTranslate(ctx)
Expand All @@ -90,20 +90,23 @@ func (r *Runner) Start(ctx context.Context) error {
return nil
}

func (r *Runner) serverXdsConfigServer(ctx context.Context) {
func (r *Runner) serveXdsConfigServer(ctx context.Context) {
addr := net.JoinHostPort(XdsGrpcSotwConfigServerAddress, strconv.Itoa(ratelimit.XdsGrpcSotwConfigServerPort))
l, err := net.Listen("tcp", addr)
if err != nil {
r.Logger.Error(err, "failed to listen on address", "address", addr)
return
}

go func() {
<-ctx.Done()
r.Logger.Info("grpc server shutting down")
r.grpc.Stop()
}()

if err = r.grpc.Serve(l); err != nil {
r.Logger.Error(err, "failed to start grpc based xds config server")
}

<-ctx.Done()
r.Logger.Info("grpc config server shutting down")
r.grpc.Stop()
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
Expand Down
22 changes: 12 additions & 10 deletions internal/xds/server/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,20 @@ func (r *Runner) serveXdsServer(ctx context.Context) {
r.Logger.Error(err, "failed to listen on address", "address", addr)
return
}
err = r.grpc.Serve(l)
if err != nil {

go func() {
<-ctx.Done()
r.Logger.Info("grpc server shutting down")
// We don't use GracefulStop here because envoy
// has long-lived hanging xDS requests. There's no
// mechanism to make those pending requests fail,
// so we forcibly terminate the TCP sessions.
r.grpc.Stop()
}()

if err = r.grpc.Serve(l); err != nil {
r.Logger.Error(err, "failed to start grpc based xds server")
}

<-ctx.Done()
r.Logger.Info("grpc server shutting down")
// We don't use GracefulStop here because envoy
// has long-lived hanging xDS requests. There's no
// mechanism to make those pending requests fail,
// so we forcibly terminate the TCP sessions.
r.grpc.Stop()
}

// registerServer registers the given xDS protocol Server with the gRPC
Expand Down

0 comments on commit 98cb3a8

Please sign in to comment.