From 64d051156699b23f4e0af6ed58d9cbee74e61c9d Mon Sep 17 00:00:00 2001 From: Joseph Anttila Hall Date: Thu, 4 Nov 2021 17:23:45 -0700 Subject: [PATCH] proxy-agent: support new SyncForever mode. This is a minimal proxy-agent half of https://github.com/kubernetes-sigs/apiserver-network-proxy/issues/273 It is already useful, in the scenario where proxy-server are restarted. --- cmd/agent/app/options/options.go | 10 ++++++++-- pkg/agent/clientset.go | 17 ++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 2df464b84..bb35f2297 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -42,7 +42,7 @@ type GrpcProxyAgentOptions struct { SyncIntervalCap time.Duration // After a duration of this time if the agent doesn't see any activity it // pings the server to see if the transport is still alive. - KeepaliveTime time.Duration + KeepaliveTime time.Duration // file contains service account authorization token for enabling proxy-server token based authorization ServiceAccountTokenPath string @@ -53,6 +53,8 @@ type GrpcProxyAgentOptions struct { // blocking call has its own problems, so it cannot easily be made race condition safe. // The check is an "unlocked" read but is still use at your own peril. WarnOnChannelLimit bool + + SyncForever bool } func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { @@ -65,7 +67,8 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) SyncIntervalCap: o.SyncIntervalCap, DialOptions: dialOptions, ServiceAccountTokenPath: o.ServiceAccountTokenPath, - WarnOnChannelLimit: o.WarnOnChannelLimit, + WarnOnChannelLimit: o.WarnOnChannelLimit, + SyncForever: o.SyncForever, } } @@ -89,6 +92,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.StringVar(&o.ServiceAccountTokenPath, "service-account-token-path", o.ServiceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.") flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true") flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.") + flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "When used, the agent continues syncing, in order to support server count changes.") return flags } @@ -111,6 +115,7 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.ServiceAccountTokenPath) klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers)) klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit) + klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever) } func (o *GrpcProxyAgentOptions) Validate() error { @@ -199,6 +204,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { KeepaliveTime: 1 * time.Hour, ServiceAccountTokenPath: "", WarnOnChannelLimit: false, + SyncForever: false, } return &o } diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index f9efdda1b..241aa234d 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -38,7 +38,7 @@ type ClientSet struct { address string // proxy server address. Assuming HA proxy server serverCount int // number of proxy server instances, should be 1 // unless it is an HA server. Initialized when the ClientSet creates - // the first client. + // the first client. When syncForever is set, it will be the most recently seen. syncInterval time.Duration // The interval by which the agent // periodically checks that it has connections to all instances of the // proxy server. @@ -57,6 +57,8 @@ type ClientSet struct { // by the server when choosing agent warnOnChannelLimit bool + + syncForever bool // Continue syncing (support dynamic server count). } func (cs *ClientSet) ClientsCount() int { @@ -124,6 +126,7 @@ type ClientSetConfig struct { DialOptions []grpc.DialOption ServiceAccountTokenPath string WarnOnChannelLimit bool + SyncForever bool } func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet { @@ -137,7 +140,8 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet syncIntervalCap: cc.SyncIntervalCap, dialOptions: cc.DialOptions, serviceAccountTokenPath: cc.ServiceAccountTokenPath, - warnOnChannelLimit: cc.WarnOnChannelLimit, + warnOnChannelLimit: cc.WarnOnChannelLimit, + syncForever: cc.SyncForever, stopCh: stopCh, } } @@ -179,7 +183,7 @@ func (cs *ClientSet) sync() { } func (cs *ClientSet) syncOnce() error { - if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { + if !cs.syncForever && cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { return nil } c, serverCount, err := cs.newAgentClient() @@ -195,6 +199,13 @@ func (cs *ClientSet) syncOnce() error { if err := cs.AddClient(c.serverID, c); err != nil { klog.ErrorS(err, "closing connection failure when adding a client") c.Close() + if cs.syncForever { + // In the SyncForever case, treat duplicate serverID case + // as an error, so that sync() will apply backoff. In the + // steady state, there will be one ongoing sync per + // syncIntervalCap. + return err + } return nil } klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID)