Skip to content

Commit

Permalink
proxy-agent: support new SyncForever mode.
Browse files Browse the repository at this point in the history
This is a minimal proxy-agent half of
kubernetes-sigs#273

It is already useful, in the scenario where proxy-server are restarted.
  • Loading branch information
jkh52 committed Nov 5, 2021
1 parent a29aa2a commit 64d0511
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
10 changes: 8 additions & 2 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type GrpcProxyAgentOptions struct {
SyncIntervalCap time.Duration
// After a duration of this time if the agent doesn't see any activity it
// pings the server to see if the transport is still alive.
KeepaliveTime time.Duration
KeepaliveTime time.Duration

// file contains service account authorization token for enabling proxy-server token based authorization
ServiceAccountTokenPath string
Expand All @@ -53,6 +53,8 @@ type GrpcProxyAgentOptions struct {
// blocking call has its own problems, so it cannot easily be made race condition safe.
// The check is an "unlocked" read but is still use at your own peril.
WarnOnChannelLimit bool

SyncForever bool
}

func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
Expand All @@ -65,7 +67,8 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
SyncIntervalCap: o.SyncIntervalCap,
DialOptions: dialOptions,
ServiceAccountTokenPath: o.ServiceAccountTokenPath,
WarnOnChannelLimit: o.WarnOnChannelLimit,
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
}
}

Expand All @@ -89,6 +92,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.ServiceAccountTokenPath, "service-account-token-path", o.ServiceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.")
flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true")
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "When used, the agent continues syncing, in order to support server count changes.")
return flags
}

Expand All @@ -111,6 +115,7 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.ServiceAccountTokenPath)
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
}

func (o *GrpcProxyAgentOptions) Validate() error {
Expand Down Expand Up @@ -199,6 +204,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
KeepaliveTime: 1 * time.Hour,
ServiceAccountTokenPath: "",
WarnOnChannelLimit: false,
SyncForever: false,
}
return &o
}
17 changes: 14 additions & 3 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ClientSet struct {
address string // proxy server address. Assuming HA proxy server
serverCount int // number of proxy server instances, should be 1
// unless it is an HA server. Initialized when the ClientSet creates
// the first client.
// the first client. When syncForever is set, it will be the most recently seen.
syncInterval time.Duration // The interval by which the agent
// periodically checks that it has connections to all instances of the
// proxy server.
Expand All @@ -57,6 +57,8 @@ type ClientSet struct {
// by the server when choosing agent

warnOnChannelLimit bool

syncForever bool // Continue syncing (support dynamic server count).
}

func (cs *ClientSet) ClientsCount() int {
Expand Down Expand Up @@ -124,6 +126,7 @@ type ClientSetConfig struct {
DialOptions []grpc.DialOption
ServiceAccountTokenPath string
WarnOnChannelLimit bool
SyncForever bool
}

func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet {
Expand All @@ -137,7 +140,8 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet
syncIntervalCap: cc.SyncIntervalCap,
dialOptions: cc.DialOptions,
serviceAccountTokenPath: cc.ServiceAccountTokenPath,
warnOnChannelLimit: cc.WarnOnChannelLimit,
warnOnChannelLimit: cc.WarnOnChannelLimit,
syncForever: cc.SyncForever,
stopCh: stopCh,
}
}
Expand Down Expand Up @@ -179,7 +183,7 @@ func (cs *ClientSet) sync() {
}

func (cs *ClientSet) syncOnce() error {
if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount {
if !cs.syncForever && cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount {
return nil
}
c, serverCount, err := cs.newAgentClient()
Expand All @@ -195,6 +199,13 @@ func (cs *ClientSet) syncOnce() error {
if err := cs.AddClient(c.serverID, c); err != nil {
klog.ErrorS(err, "closing connection failure when adding a client")
c.Close()
if cs.syncForever {
// In the SyncForever case, treat duplicate serverID case
// as an error, so that sync() will apply backoff. In the
// steady state, there will be one ongoing sync per
// syncIntervalCap.
return err
}
return nil
}
klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID)
Expand Down

0 comments on commit 64d0511

Please sign in to comment.