From ab675935d75765bdad2f6d03a05eb6bd036d4d7e Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 20 Sep 2024 12:57:47 +1000 Subject: [PATCH] feat(output): Allow keepalive config (#378) --- pkg/output/xatu/config.go | 7 +++++++ pkg/output/xatu/exporter.go | 22 +++++++++++++++++----- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/output/xatu/config.go b/pkg/output/xatu/config.go index 2716c596..31f5bb5c 100644 --- a/pkg/output/xatu/config.go +++ b/pkg/output/xatu/config.go @@ -16,6 +16,13 @@ type Config struct { Workers int `yaml:"workers" default:"1"` Retry RetryConfig `yaml:"retry"` AuthorizationSecret string `yaml:"authorizationSecret"` + KeepAlive KeepAliveConfig `yaml:"keepAlive"` +} + +type KeepAliveConfig struct { + Enabled bool `yaml:"enabled" default:"true"` + Time time.Duration `yaml:"time" default:"10s"` + Timeout time.Duration `yaml:"timeout" default:"30s"` } func (c *Config) Validate() error { diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index 5aac573c..c621d762 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -32,14 +32,26 @@ type ItemExporter struct { } func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { + log = log.WithField("output_name", name).WithField("output_type", SinkType) + opts := []grpc.DialOption{ grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor, retry.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor, retry.StreamClientInterceptor()), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 30 * time.Second, + } + + if config.KeepAlive.Enabled { + log. + WithField("keepalive_time", config.KeepAlive.Time). + WithField("keepalive_timeout", config.KeepAlive.Timeout). + Info("Enabling keepalive") + + opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: config.KeepAlive.Time, + Timeout: config.KeepAlive.Timeout, PermitWithoutStream: true, - }), + })) + } else { + log.Info("Disabling keepalive") } if config.TLS { @@ -60,7 +72,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE return ItemExporter{ config: config, - log: log.WithField("output_name", name).WithField("output_type", SinkType), + log: log, conn: conn, client: pb.NewEventIngesterClient(conn), }, nil