Skip to content

Commit

Permalink
adding streaming server graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
so-sahu committed Mar 19, 2024
1 parent 1027550 commit 7e4925c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 27 deletions.
73 changes: 50 additions & 23 deletions provider/cmd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/url"
"os"
"path/filepath"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -88,10 +89,11 @@ type Options struct {
}

type HTTPServerOptions struct {
Addr string
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
Addr string
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
GracefulTimeout time.Duration
}

type GRPCServerOptions struct {
Expand Down Expand Up @@ -119,10 +121,11 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
// ServerOptions
fs.StringVar(&o.Servers.GRPC.Addr, "servers-grpc-address", "/var/run/iri-machinebroker.sock", "Address to listen on.")
fs.DurationVar(&o.Servers.GRPC.ConnectionTimeout, "servers-grpc-connectiontimeout", 3*time.Second, "Connection timeout for GRPC server.")
fs.StringVar(&o.Servers.Streaming.Addr, "servers-streaming-address", "127.0.0.1:20251", "Address to run the streaming server on")
fs.StringVar(&o.Servers.Streaming.Addr, "servers-streaming-address", "127.0.0.1:20251", "Address at which the stream server will listen")
fs.DurationVar(&o.Servers.Streaming.ReadTimeout, "servers-streaming-readtimeout", 200*time.Millisecond, "Read timeout for streaming server.")
fs.DurationVar(&o.Servers.Streaming.WriteTimeout, "servers-streaming-writetimeout", 200*time.Millisecond, "Write timeout for streaming server.")
fs.DurationVar(&o.Servers.Streaming.IdleTimeout, "server-streaming-idletimeout", 1*time.Second, "Idle timeout for connections to streaming server.")
fs.DurationVar(&o.Servers.Streaming.IdleTimeout, "servers-streaming-idletimeout", 1*time.Second, "Idle timeout for connections to streaming server.")
fs.DurationVar(&o.Servers.Streaming.GracefulTimeout, "servers-streaming-gracefultimeout", 2*time.Second, "Graceful timeout to shutdown streaming server. Ideally set it little longer than idletimeout.")

fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.")
fs.StringVar(&o.PathSupportedMachineClasses, "supported-machine-classes", o.PathSupportedMachineClasses, "File containing supported machine classes.")
Expand Down Expand Up @@ -398,7 +401,7 @@ func Run(ctx context.Context, opts Options) error {

g.Go(func() error {
setupLog.Info("Starting grpc server")
if err := runGRPCServer(ctx, setupLog, log, srv, opts); err != nil {
if err := runGRPCServer(ctx, setupLog, log, srv, opts.Servers.GRPC); err != nil {
setupLog.Error(err, "failed to start grpc server")
return err
}
Expand All @@ -407,7 +410,7 @@ func Run(ctx context.Context, opts Options) error {

g.Go(func() error {
setupLog.Info("Starting streaming server")
if err := runStreamingServer(ctx, setupLog, log, srv, opts); err != nil {
if err := runStreamingServer(ctx, setupLog, log, srv, opts.Servers.Streaming); err != nil {
setupLog.Error(err, "failed to start streaming server")
return err
}
Expand All @@ -417,9 +420,9 @@ func Run(ctx context.Context, opts Options) error {
return g.Wait()
}

func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error {
func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts GRPCServerOptions) error {
setupLog.V(1).Info("Cleaning up any previous socket")
if err := common.CleanupSocketIfExists(opts.Servers.GRPC.Addr); err != nil {
if err := common.CleanupSocketIfExists(opts.Addr); err != nil {
return fmt.Errorf("error cleaning up socket: %w", err)
}

Expand All @@ -428,52 +431,76 @@ func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, s
commongrpc.InjectLogger(log.WithName("iri-server")),
commongrpc.LogRequest,
),
grpc.ConnectionTimeout(opts.Servers.GRPC.ConnectionTimeout),
grpc.ConnectionTimeout(opts.ConnectionTimeout),
)
iri.RegisterMachineRuntimeServer(grpcSrv, srv)

setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Servers.GRPC.Addr)
l, err := net.Listen("unix", opts.Servers.GRPC.Addr)
setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Addr)
l, err := net.Listen("unix", opts.Addr)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}

setupLog.Info("Starting grpc server", "Address", l.Addr().String())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
setupLog.Info("Shutting down grpc server")
grpcSrv.GracefulStop()
setupLog.Info("Shut down grpc server")
setupLog.Info("GRPC server is shutdown")
}()

setupLog.Info("Starting grpc server", "Address", l.Addr().String())
if err := grpcSrv.Serve(l); err != nil {
return fmt.Errorf("error serving grpc: %w", err)
}

setupLog.Info("GRPC server stopped serving requests")

wg.Wait()

return nil
}

func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts Options) error {
func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts HTTPServerOptions) error {
httpHandler := providerhttp.NewHandler(srv, providerhttp.HandlerOptions{
Log: log.WithName("streaming-server"),
})

httpSrv := &http.Server{
Addr: opts.Servers.Streaming.Addr,
Addr: opts.Addr,
Handler: httpHandler,
ReadTimeout: opts.Servers.Streaming.ReadTimeout,
WriteTimeout: opts.Servers.Streaming.WriteTimeout,
IdleTimeout: opts.Servers.Streaming.IdleTimeout,
ReadTimeout: opts.ReadTimeout,
WriteTimeout: opts.WriteTimeout,
IdleTimeout: opts.IdleTimeout,
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
setupLog.Info("Shutting down streaming server")
_ = httpSrv.Close()
setupLog.Info("Shut down streaming server")
shutdownCtx, cancel := context.WithTimeout(context.Background(), opts.GracefulTimeout)
defer cancel()

locErr := httpSrv.Shutdown(shutdownCtx)
if locErr != nil {
setupLog.Error(locErr, "streaming server wasn't shutdown properly")
} else {
setupLog.Info("Streaming server is shutdown")
}
}()

setupLog.V(1).Info("Starting streaming server", "Address", opts.Servers.Streaming.Addr)
setupLog.V(1).Info("Starting streaming server", "Address", opts.Addr)
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("error listening / serving streaming server: %w", err)
}

setupLog.Info("Streaming server stopped serving requests")

wg.Wait()

return nil
}
9 changes: 5 additions & 4 deletions provider/server/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ var _ = BeforeSuite(func() {
ConnectionTimeout: 3 * time.Second,
},
Streaming: app.HTTPServerOptions{
Addr: streamingAddress,
ReadTimeout: 200 * time.Millisecond,
WriteTimeout: 200 * time.Millisecond,
IdleTimeout: 1 * time.Second,
Addr: streamingAddress,
ReadTimeout: 200 * time.Millisecond,
WriteTimeout: 200 * time.Millisecond,
IdleTimeout: 1 * time.Second,
GracefulTimeout: 2 * time.Second,
},
},
}
Expand Down

0 comments on commit 7e4925c

Please sign in to comment.