From 2a115c2ff31644651f4004ff1b4ac39cdff76c37 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 27 Jan 2020 11:46:31 -0800 Subject: [PATCH] do exponential backoff in clientset sync function --- cmd/agent/main.go | 2 +- pkg/agent/agentclient/clientset.go | 87 ++++++++++++++++++------------ 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 81062032f..ec0bd0ae0 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -93,7 +93,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.StringVar(&o.proxyServerHost, "proxy-server-host", o.proxyServerHost, "The hostname to use to connect to the proxy-server.") flags.IntVar(&o.proxyServerPort, "proxy-server-port", o.proxyServerPort, "The port the proxy server is listening on.") flags.StringVar(&o.agentID, "agent-id", o.agentID, "The unique ID of this agent. Default to a generated uuid if not set.") - flags.DurationVar(&o.syncInterval, "sync-interval", o.syncInterval, "The interval by which the agent periodically checks that it has connections to all instances of the proxy server.") + flags.DurationVar(&o.syncInterval, "sync-interval", o.syncInterval, "The initial interval by which the agent periodically checks if it has connections to all instances of the proxy server.") flags.DurationVar(&o.probeInterval, "probe-interval", o.probeInterval, "The interval by which the agent periodically checks if its connections to the proxy server are ready.") flags.DurationVar(&o.reconnectInterval, "reconnect-interval", o.reconnectInterval, "The interval by which the agent tries to reconnect.") flags.StringVar(&o.serviceAccountTokenPath, "service-account-token-path", o.serviceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.") diff --git a/pkg/agent/agentclient/clientset.go b/pkg/agent/agentclient/clientset.go index 67867fc7f..d05102aa2 100644 --- a/pkg/agent/agentclient/clientset.go +++ b/pkg/agent/agentclient/clientset.go @@ -18,12 +18,12 @@ package agentclient import ( "fmt" - "math/rand" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" ) @@ -31,7 +31,7 @@ import ( type ClientSet struct { mu sync.Mutex //protects the clients. clients map[string]*AgentClient // map between serverID and the client - // connects to this proxy server. + // connects to this server. agentID string // ID of this agent address string // proxy server address. Assuming HA proxy server @@ -101,24 +101,24 @@ func (cs *ClientSet) RemoveClient(serverID string) { } type ClientSetConfig struct { - Address string - AgentID string - SyncInterval time.Duration - ProbeInterval time.Duration - ReconnectInterval time.Duration - DialOption grpc.DialOption + Address string + AgentID string + SyncInterval time.Duration + ProbeInterval time.Duration + ReconnectInterval time.Duration + DialOption grpc.DialOption ServiceAccountTokenPath string } func (cc *ClientSetConfig) NewAgentClientSet() *ClientSet { return &ClientSet{ - clients: make(map[string]*AgentClient), - agentID: cc.AgentID, - address: cc.Address, - syncInterval: cc.SyncInterval, - probeInterval: cc.ProbeInterval, - reconnectInterval: cc.ReconnectInterval, - dialOption: cc.DialOption, + clients: make(map[string]*AgentClient), + agentID: cc.AgentID, + address: cc.Address, + syncInterval: cc.SyncInterval, + probeInterval: cc.ProbeInterval, + reconnectInterval: cc.ReconnectInterval, + dialOption: cc.DialOption, serviceAccountTokenPath: cc.ServiceAccountTokenPath, } @@ -128,30 +128,49 @@ func (cs *ClientSet) newAgentClient() (*AgentClient, error) { return newAgentClient(cs.address, cs.agentID, cs, cs.dialOption) } +func (cs *ClientSet) resetBackoff() *wait.Backoff { + return &wait.Backoff{ + Steps: 3, + Jitter: 0.1, + Factor: 1.5, + Duration: cs.syncInterval, + Cap: 60 * time.Second, + } +} + // sync makes sure that #clients >= #proxy servers func (cs *ClientSet) sync() { - jitter := float64(0.2) + backoff := cs.resetBackoff() + var duration time.Duration for { - if cs.serverCount != 0 { - sleep := cs.syncInterval + time.Duration(rand.Float64()*jitter*float64(cs.syncInterval)) - time.Sleep(sleep) - } - if cs.serverCount == 0 || cs.ClientsCount() < cs.serverCount { - c, err := cs.newAgentClient() - if err != nil { - klog.Error(err) - continue - } - cs.serverCount = c.stream.serverCount - if err := cs.AddClient(c.stream.serverID, c); err != nil { - klog.Infof("closing connection: %v", err) - c.Close() - continue - } - klog.Infof("sync added client connecting to proxy server %s", c.stream.serverID) - go c.Serve() + if err := cs.syncOnce(); err != nil { + klog.Error(err) + duration = backoff.Step() + } else { + backoff = cs.resetBackoff() + duration = wait.Jitter(backoff.Duration, backoff.Jitter) } + time.Sleep(duration) + } +} + +func (cs *ClientSet) syncOnce() error { + if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { + return nil + } + c, err := cs.newAgentClient() + if err != nil { + return err } + cs.serverCount = c.stream.serverCount + if err := cs.AddClient(c.stream.serverID, c); err != nil { + klog.Infof("closing connection: %v", err) + c.Close() + return nil + } + klog.Infof("sync added client connecting to proxy server %s", c.stream.serverID) + go c.Serve() + return nil } func (cs *ClientSet) Serve() {