Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Timeouts for GRPC and Streaming Servers #221

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 73 additions & 21 deletions provider/cmd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/url"
"os"
"path/filepath"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -66,9 +67,7 @@ func init() {
}

type Options struct {
Address string
StreamingAddress string
BaseURL string
BaseURL string

RootDir string

Expand All @@ -83,11 +82,30 @@ type Options struct {

Libvirt LibvirtOptions
NicPlugin *networkinterfaceplugin.Options
Servers ServersOptions

GCVMGracefulShutdownTimeout time.Duration
ResyncIntervalGarbageCollector time.Duration
}

type HTTPServerOptions struct {
Addr string
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
GracefulTimeout time.Duration
}

type GRPCServerOptions struct {
Addr string
ConnectionTimeout time.Duration
}

type ServersOptions struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will create conflict with #196

Streaming HTTPServerOptions
GRPC GRPCServerOptions
}

type LibvirtOptions struct {
Socket string
Address string
Expand All @@ -100,15 +118,21 @@ type LibvirtOptions struct {
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Address, "address", "/var/run/iri-machinebroker.sock", "Address to listen on.")
fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.")
// ServerOptions
fs.StringVar(&o.Servers.GRPC.Addr, "servers-grpc-address", "/var/run/iri-machinebroker.sock", "Address to listen on.")
fs.DurationVar(&o.Servers.GRPC.ConnectionTimeout, "servers-grpc-connectiontimeout", 3*time.Second, "Connection timeout for GRPC server.")
fs.StringVar(&o.Servers.Streaming.Addr, "servers-streaming-address", "127.0.0.1:20251", "Address at which the stream server will listen")
fs.DurationVar(&o.Servers.Streaming.ReadTimeout, "servers-streaming-readtimeout", 200*time.Millisecond, "Read timeout for streaming server.")
fs.DurationVar(&o.Servers.Streaming.WriteTimeout, "servers-streaming-writetimeout", 200*time.Millisecond, "Write timeout for streaming server.")
fs.DurationVar(&o.Servers.Streaming.IdleTimeout, "servers-streaming-idletimeout", 1*time.Second, "Idle timeout for connections to streaming server.")
fs.DurationVar(&o.Servers.Streaming.GracefulTimeout, "servers-streaming-gracefultimeout", 2*time.Second, "Graceful timeout to shutdown streaming server. Ideally set it little longer than idletimeout.")

fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.")
fs.StringVar(&o.PathSupportedMachineClasses, "supported-machine-classes", o.PathSupportedMachineClasses, "File containing supported machine classes.")
fs.DurationVar(&o.ResyncIntervalVolumeSize, "volume-size-resync-interval", 1*time.Minute, "Interval to determine volume size changes.")

fs.StringVar(&o.ApinetKubeconfig, "apinet-kubeconfig", "", "Path to the kubeconfig file for the apinet-cluster.")

fs.StringVar(&o.StreamingAddress, "streaming-address", "127.0.0.1:20251", "Address to run the streaming server on")
fs.StringVar(&o.BaseURL, "base-url", "", "The base url to construct urls for streaming from. If empty it will be "+
"constructed from the streaming-address")

Expand Down Expand Up @@ -189,7 +213,7 @@ func Run(ctx context.Context, opts Options) error {
if baseURL == "" {
u := &url.URL{
Scheme: "http",
Host: opts.StreamingAddress,
Host: opts.Servers.Streaming.Addr,
}
baseURL = u.String()
}
Expand Down Expand Up @@ -377,7 +401,7 @@ func Run(ctx context.Context, opts Options) error {

g.Go(func() error {
setupLog.Info("Starting grpc server")
if err := runGRPCServer(ctx, setupLog, log, srv, opts); err != nil {
if err := runGRPCServer(ctx, setupLog, log, srv, opts.Servers.GRPC); err != nil {
setupLog.Error(err, "failed to start grpc server")
return err
}
Expand All @@ -386,7 +410,7 @@ func Run(ctx context.Context, opts Options) error {

g.Go(func() error {
setupLog.Info("Starting streaming server")
if err := runStreamingServer(ctx, setupLog, log, srv, opts); err != nil {
if err := runStreamingServer(ctx, setupLog, log, srv, opts.Servers.Streaming); err != nil {
setupLog.Error(err, "failed to start streaming server")
return err
}
Expand All @@ -396,9 +420,9 @@ func Run(ctx context.Context, opts Options) error {
return g.Wait()
}

func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error {
func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts GRPCServerOptions) error {
setupLog.V(1).Info("Cleaning up any previous socket")
if err := common.CleanupSocketIfExists(opts.Address); err != nil {
if err := common.CleanupSocketIfExists(opts.Addr); err != nil {
return fmt.Errorf("error cleaning up socket: %w", err)
}

Expand All @@ -407,48 +431,76 @@ func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, s
commongrpc.InjectLogger(log.WithName("iri-server")),
commongrpc.LogRequest,
),
grpc.ConnectionTimeout(opts.ConnectionTimeout),
)
iri.RegisterMachineRuntimeServer(grpcSrv, srv)

setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Address)
l, err := net.Listen("unix", opts.Address)
setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Addr)
l, err := net.Listen("unix", opts.Addr)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}

setupLog.Info("Starting grpc server", "Address", l.Addr().String())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
setupLog.Info("Shutting down grpc server")
grpcSrv.GracefulStop()
setupLog.Info("Shut down grpc server")
setupLog.Info("GRPC server is shutdown")
}()

setupLog.Info("Starting grpc server", "Address", l.Addr().String())
if err := grpcSrv.Serve(l); err != nil {
return fmt.Errorf("error serving grpc: %w", err)
}

setupLog.Info("GRPC server stopped serving requests")

wg.Wait()

return nil
}

func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts Options) error {
func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts HTTPServerOptions) error {
httpHandler := providerhttp.NewHandler(srv, providerhttp.HandlerOptions{
Log: log.WithName("streaming-server"),
})

httpSrv := &http.Server{
Addr: opts.StreamingAddress,
Handler: httpHandler,
Addr: opts.Addr,
Handler: httpHandler,
ReadTimeout: opts.ReadTimeout,
WriteTimeout: opts.WriteTimeout,
IdleTimeout: opts.IdleTimeout,
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
setupLog.Info("Shutting down streaming server")
_ = httpSrv.Close()
setupLog.Info("Shut down streaming server")
shutdownCtx, cancel := context.WithTimeout(context.Background(), opts.GracefulTimeout)
defer cancel()

locErr := httpSrv.Shutdown(shutdownCtx)
lukas016 marked this conversation as resolved.
Show resolved Hide resolved
if locErr != nil {
setupLog.Error(locErr, "streaming server wasn't shutdown properly")
} else {
setupLog.Info("Streaming server is shutdown")
}
}()

setupLog.V(1).Info("Starting streaming server", "Address", opts.StreamingAddress)
setupLog.V(1).Info("Starting streaming server", "Address", opts.Addr)
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("error listening / serving streaming server: %w", err)
}

setupLog.Info("Streaming server stopped serving requests")

wg.Wait()

return nil
}
19 changes: 15 additions & 4 deletions provider/server/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,9 @@ var _ = BeforeSuite(func() {
Expect(os.Chmod(tempDir, 0730)).Should(Succeed())

opts := app.Options{
Address: filepath.Join(tempDir, "test.sock"),
BaseURL: baseURL,
PathSupportedMachineClasses: machineClassesFile.Name(),
RootDir: filepath.Join(tempDir, "libvirt-provider"),
StreamingAddress: streamingAddress,
Libvirt: app.LibvirtOptions{
Socket: "/var/run/libvirt/libvirt-sock",
URI: "qemu:///system",
Expand All @@ -118,6 +116,19 @@ var _ = BeforeSuite(func() {
ResyncIntervalGarbageCollector: resyncGarbageCollectorInterval,
ResyncIntervalVolumeSize: resyncVolumeSizeInterval,
GuestAgent: app.GuestAgentOption(api.GuestAgentNone),
Servers: app.ServersOptions{
GRPC: app.GRPCServerOptions{
Addr: filepath.Join(tempDir, "test.sock"),
ConnectionTimeout: 3 * time.Second,
},
Streaming: app.HTTPServerOptions{
Addr: streamingAddress,
ReadTimeout: 200 * time.Millisecond,
WriteTimeout: 200 * time.Millisecond,
IdleTimeout: 1 * time.Second,
GracefulTimeout: 2 * time.Second,
},
},
}

srvCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -129,10 +140,10 @@ var _ = BeforeSuite(func() {
}()

Eventually(func() error {
return isSocketAvailable(opts.Address)
return isSocketAvailable(opts.Servers.GRPC.Addr)
}).WithTimeout(30 * time.Second).WithPolling(500 * time.Millisecond).Should(Succeed())

address, err := machine.GetAddressWithTimeout(3*time.Second, fmt.Sprintf("unix://%s", opts.Address))
address, err := machine.GetAddressWithTimeout(3*time.Second, fmt.Sprintf("unix://%s", opts.Servers.GRPC.Addr))
Expect(err).NotTo(HaveOccurred())

gconn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down
Loading