diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 9c7010cda..d15ed9e5d 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -53,6 +53,8 @@ type GrpcProxyAgentOptions struct { // blocking call has its own problems, so it cannot easily be made race condition safe. // The check is an "unlocked" read but is still use at your own peril. WarnOnChannelLimit bool + + SyncForever bool } func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { @@ -66,6 +68,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) DialOptions: dialOptions, ServiceAccountTokenPath: o.ServiceAccountTokenPath, WarnOnChannelLimit: o.WarnOnChannelLimit, + SyncForever: o.SyncForever, } } @@ -89,6 +92,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.StringVar(&o.ServiceAccountTokenPath, "service-account-token-path", o.ServiceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.") flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true") flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.") + flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.") return flags } @@ -111,6 +115,7 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.ServiceAccountTokenPath) klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers)) klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit) + klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever) } func (o *GrpcProxyAgentOptions) Validate() error { @@ -199,6 +204,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { KeepaliveTime: 1 * time.Hour, ServiceAccountTokenPath: "", WarnOnChannelLimit: false, + SyncForever: false, } return &o } diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 23a613342..6995e469f 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -17,7 +17,6 @@ limitations under the License. package agent import ( - "fmt" "math" "sync" "time" @@ -38,7 +37,7 @@ type ClientSet struct { address string // proxy server address. Assuming HA proxy server serverCount int // number of proxy server instances, should be 1 // unless it is an HA server. Initialized when the ClientSet creates - // the first client. + // the first client. When syncForever is set, it will be the most recently seen. syncInterval time.Duration // The interval by which the agent // periodically checks that it has connections to all instances of the // proxy server. @@ -57,6 +56,8 @@ type ClientSet struct { // by the server when choosing agent warnOnChannelLimit bool + + syncForever bool // Continue syncing (support dynamic server count). } func (cs *ClientSet) ClientsCount() int { @@ -89,9 +90,17 @@ func (cs *ClientSet) HasID(serverID string) bool { return cs.hasIDLocked(serverID) } +type DuplicateServerError struct { + ServerID string +} + +func (dse *DuplicateServerError) Error() string { + return "duplicate server: " + dse.ServerID +} + func (cs *ClientSet) addClientLocked(serverID string, c *Client) error { if cs.hasIDLocked(serverID) { - return fmt.Errorf("client for proxy server %s already exists", serverID) + return &DuplicateServerError{ServerID: serverID} } cs.clients[serverID] = c return nil @@ -124,6 +133,7 @@ type ClientSetConfig struct { DialOptions []grpc.DialOption ServiceAccountTokenPath string WarnOnChannelLimit bool + SyncForever bool } func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet { @@ -138,6 +148,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet dialOptions: cc.DialOptions, serviceAccountTokenPath: cc.ServiceAccountTokenPath, warnOnChannelLimit: cc.WarnOnChannelLimit, + syncForever: cc.SyncForever, stopCh: stopCh, } } @@ -162,9 +173,16 @@ func (cs *ClientSet) sync() { backoff := cs.resetBackoff() var duration time.Duration for { - if err := cs.syncOnce(); err != nil { - klog.ErrorS(err, "cannot sync once") - duration = backoff.Step() + if err := cs.connectOnce(); err != nil { + if err, ok := err.(*DuplicateServerError); ok { + klog.V(5).InfoS("duplicate server", "serverCount", cs.serverCount, "clientsCount", cs.ClientsCount()) + if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { + duration = backoff.Step() + } + } else { + klog.ErrorS(err, "cannot sync once") + duration = backoff.Step() + } } else { backoff = cs.resetBackoff() duration = wait.Jitter(backoff.Duration, backoff.Jitter) @@ -178,8 +196,8 @@ func (cs *ClientSet) sync() { } } -func (cs *ClientSet) syncOnce() error { - if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { +func (cs *ClientSet) connectOnce() error { + if !cs.syncForever && cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { return nil } c, serverCount, err := cs.newAgentClient() @@ -193,9 +211,13 @@ func (cs *ClientSet) syncOnce() error { } cs.serverCount = serverCount if err := cs.AddClient(c.serverID, c); err != nil { - klog.ErrorS(err, "closing connection failure when adding a client") + if err, ok := err.(*DuplicateServerError); ok { + klog.V(2).InfoS("closing connection to duplicate server", "serverID", err.ServerID) + } else { + klog.ErrorS(err, "closing connection failure when adding a client") + } c.Close() - return nil + return err } klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID) go c.Serve()