From ed9e1200818f78c7056c3dff6ea4ba1c4a7993bc Mon Sep 17 00:00:00 2001 From: Joseph Anttila Hall Date: Thu, 4 Nov 2021 17:23:45 -0700 Subject: [PATCH] proxy-agent: support new SyncForever mode. This is a minimal proxy-agent half of https://github.com/kubernetes-sigs/apiserver-network-proxy/issues/273 It is already useful, in the scenario where proxy-server are restarted. --- cmd/agent/app/options/options.go | 10 ++++-- pkg/agent/clientset.go | 55 ++++++++++++++++++++++---------- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 2df464b84..bb35f2297 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -42,7 +42,7 @@ type GrpcProxyAgentOptions struct { SyncIntervalCap time.Duration // After a duration of this time if the agent doesn't see any activity it // pings the server to see if the transport is still alive. - KeepaliveTime time.Duration + KeepaliveTime time.Duration // file contains service account authorization token for enabling proxy-server token based authorization ServiceAccountTokenPath string @@ -53,6 +53,8 @@ type GrpcProxyAgentOptions struct { // blocking call has its own problems, so it cannot easily be made race condition safe. // The check is an "unlocked" read but is still use at your own peril. WarnOnChannelLimit bool + + SyncForever bool } func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { @@ -65,7 +67,8 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) SyncIntervalCap: o.SyncIntervalCap, DialOptions: dialOptions, ServiceAccountTokenPath: o.ServiceAccountTokenPath, - WarnOnChannelLimit: o.WarnOnChannelLimit, + WarnOnChannelLimit: o.WarnOnChannelLimit, + SyncForever: o.SyncForever, } } @@ -89,6 +92,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.StringVar(&o.ServiceAccountTokenPath, "service-account-token-path", o.ServiceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.") flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true") flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.") + flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "When used, the agent continues syncing, in order to support server count changes.") return flags } @@ -111,6 +115,7 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.ServiceAccountTokenPath) klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers)) klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit) + klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever) } func (o *GrpcProxyAgentOptions) Validate() error { @@ -199,6 +204,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { KeepaliveTime: 1 * time.Hour, ServiceAccountTokenPath: "", WarnOnChannelLimit: false, + SyncForever: false, } return &o } diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index f9efdda1b..fd8d95d70 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -17,7 +17,6 @@ limitations under the License. package agent import ( - "fmt" "math" "sync" "time" @@ -38,7 +37,7 @@ type ClientSet struct { address string // proxy server address. Assuming HA proxy server serverCount int // number of proxy server instances, should be 1 // unless it is an HA server. Initialized when the ClientSet creates - // the first client. + // the first client. When syncForever is set, it will be the most recently seen. syncInterval time.Duration // The interval by which the agent // periodically checks that it has connections to all instances of the // proxy server. @@ -57,6 +56,8 @@ type ClientSet struct { // by the server when choosing agent warnOnChannelLimit bool + + syncForever bool // Continue syncing (support dynamic server count). } func (cs *ClientSet) ClientsCount() int { @@ -89,9 +90,17 @@ func (cs *ClientSet) HasID(serverID string) bool { return cs.hasIDLocked(serverID) } +type DuplicateServerError struct { + ServerID string +} + +func (dse *DuplicateServerError) Error() string { + return "duplicate server: " + dse.ServerID +} + func (cs *ClientSet) addClientLocked(serverID string, c *Client) error { if cs.hasIDLocked(serverID) { - return fmt.Errorf("client for proxy server %s already exists", serverID) + return &DuplicateServerError{ServerID: serverID} } cs.clients[serverID] = c return nil @@ -124,6 +133,7 @@ type ClientSetConfig struct { DialOptions []grpc.DialOption ServiceAccountTokenPath string WarnOnChannelLimit bool + SyncForever bool } func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet { @@ -137,7 +147,8 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet syncIntervalCap: cc.SyncIntervalCap, dialOptions: cc.DialOptions, serviceAccountTokenPath: cc.ServiceAccountTokenPath, - warnOnChannelLimit: cc.WarnOnChannelLimit, + warnOnChannelLimit: cc.WarnOnChannelLimit, + syncForever: cc.SyncForever, stopCh: stopCh, } } @@ -162,12 +173,19 @@ func (cs *ClientSet) sync() { backoff := cs.resetBackoff() var duration time.Duration for { - if err := cs.syncOnce(); err != nil { - klog.ErrorS(err, "cannot sync once") - duration = backoff.Step() - } else { + err := cs.connectOnce() + switch err := err.(type) { + case nil: backoff = cs.resetBackoff() duration = wait.Jitter(backoff.Duration, backoff.Jitter) + case *DuplicateServerError: + klog.V(5).InfoS("duplicate server", "serverCount", cs.serverCount, "clientsCount", cs.ClientsCount()) + if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { + duration = backoff.Step() + } + default: + klog.ErrorS(err, "cannot sync once") + duration = backoff.Step() } time.Sleep(duration) select { @@ -178,8 +196,8 @@ func (cs *ClientSet) sync() { } } -func (cs *ClientSet) syncOnce() error { - if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { +func (cs *ClientSet) connectOnce() error { + if !cs.syncForever && cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { return nil } c, serverCount, err := cs.newAgentClient() @@ -192,14 +210,19 @@ func (cs *ClientSet) syncOnce() error { } cs.serverCount = serverCount - if err := cs.AddClient(c.serverID, c); err != nil { - klog.ErrorS(err, "closing connection failure when adding a client") - c.Close() + err = cs.AddClient(c.serverID, c) + switch err := err.(type) { + case nil: + klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID) + go c.Serve() return nil + case *DuplicateServerError: + klog.V(2).InfoS("closing connection to duplicate server", "serverID", err.ServerID) + c.Close() + return err + default: + return err } - klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID) - go c.Serve() - return nil } func (cs *ClientSet) Serve() {