Skip to content

Commit

Permalink
Merge pull request #624 from cnvergence/add-flag-channel-size
Browse files Browse the repository at this point in the history
Allow configuration of transport channel size in agent and server
  • Loading branch information
k8s-ci-robot authored Jun 20, 2024
2 parents e2f6996 + 31dffc5 commit fc590c6
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 29 deletions.
10 changes: 9 additions & 1 deletion cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type GrpcProxyAgentOptions struct {
// The check is an "unlocked" read but is still use at your own peril.
WarnOnChannelLimit bool

SyncForever bool
SyncForever bool
XfrChannelSize int
}

func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
Expand All @@ -93,6 +94,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
ServiceAccountTokenPath: o.ServiceAccountTokenPath,
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
XfrChannelSize: o.XfrChannelSize,
}
}

Expand All @@ -119,6 +121,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true")
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
return flags
}

Expand All @@ -144,6 +147,7 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
}

func (o *GrpcProxyAgentOptions) Validate() error {
Expand Down Expand Up @@ -177,6 +181,9 @@ func (o *GrpcProxyAgentOptions) Validate() error {
if o.AdminServerPort <= 0 {
return fmt.Errorf("admin server port %d must be greater than 0", o.AdminServerPort)
}
if o.XfrChannelSize <= 0 {
return fmt.Errorf("channel size %d must be greater than 0", o.XfrChannelSize)
}
if o.EnableContentionProfiling && !o.EnableProfiling {
return fmt.Errorf("if --enable-contention-profiling is set, --enable-profiling must also be set")
}
Expand Down Expand Up @@ -235,6 +242,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
ServiceAccountTokenPath: "",
WarnOnChannelLimit: false,
SyncForever: false,
XfrChannelSize: 150,
}
return &o
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/agent/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package options

import (
"fmt"
"github.com/stretchr/testify/assert"
"reflect"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

/*
Expand Down Expand Up @@ -49,6 +50,7 @@ func TestDefaultServerOptions(t *testing.T) {
assertDefaultValue(t, "ServiceAccountTokenPath", defaultAgentOptions.ServiceAccountTokenPath, "")
assertDefaultValue(t, "WarnOnChannelLimit", defaultAgentOptions.WarnOnChannelLimit, false)
assertDefaultValue(t, "SyncForever", defaultAgentOptions.SyncForever, false)
assertDefaultValue(t, "XfrChannelSize", defaultAgentOptions.XfrChannelSize, 150)
}

func assertDefaultValue(t *testing.T, fieldName string, actual, expected interface{}) {
Expand Down Expand Up @@ -145,6 +147,14 @@ func TestValidate(t *testing.T) {
},
expected: fmt.Errorf("if --enable-contention-profiling is set, --enable-profiling must also be set"),
},
"ZeroXfrChannelSize": {
fieldMap: map[string]interface{}{"XfrChannelSize": 0},
expected: fmt.Errorf("channel size 0 must be greater than 0"),
},
"NegativeXfrChannelSize": {
fieldMap: map[string]interface{}{"XfrChannelSize": -10},
expected: fmt.Errorf("channel size -10 must be greater than 0"),
},
} {
t.Run(desc, func(t *testing.T) {
testAgentOptions := NewGrpcProxyAgentOptions()
Expand Down
10 changes: 8 additions & 2 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ type ProxyRunOptions struct {
// also checks if given comma separated list contains cipher from tls.InsecureCipherSuites().
// NOTE that cipher suites are not configurable for TLS1.3,
// see: https://pkg.go.dev/crypto/tls#Config, so in that case, this option won't have any effect.
CipherSuites []string
CipherSuites []string
XfrChannelSize int
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand Down Expand Up @@ -136,6 +137,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.AuthenticationAudience, "authentication-audience", o.AuthenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).")
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick an agent/tunnel, available strategies are: default, destHost, defaultRoute.")
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")

flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
Expand Down Expand Up @@ -175,6 +177,7 @@ func (o *ProxyRunOptions) Print() {
klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst)
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies)
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
}

func (o *ProxyRunOptions) Validate() error {
Expand Down Expand Up @@ -297,7 +300,9 @@ func (o *ProxyRunOptions) Validate() error {
if _, err := server.ParseProxyStrategies(o.ProxyStrategies); err != nil {
return fmt.Errorf("invalid proxy strategies: %v", err)
}

if o.XfrChannelSize <= 0 {
return fmt.Errorf("channel size %d must be greater than 0", o.XfrChannelSize)
}
// validate the cipher suites
if len(o.CipherSuites) != 0 {
acceptedCiphers := util.GetAcceptedCiphers()
Expand Down Expand Up @@ -345,6 +350,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
AuthenticationAudience: "",
ProxyStrategies: "default",
CipherSuites: make([]string, 0),
XfrChannelSize: 10,
}
return &o
}
Expand Down
12 changes: 12 additions & 0 deletions cmd/server/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func TestDefaultServerOptions(t *testing.T) {
assertDefaultValue(t, "AuthenticationAudience", defaultServerOptions.AuthenticationAudience, "")
assertDefaultValue(t, "ProxyStrategies", defaultServerOptions.ProxyStrategies, "default")
assertDefaultValue(t, "CipherSuites", defaultServerOptions.CipherSuites, make([]string, 0))
assertDefaultValue(t, "XfrChannelSize", defaultServerOptions.XfrChannelSize, 10)

}

func assertDefaultValue(t *testing.T, fieldName string, actual, expected interface{}) {
Expand Down Expand Up @@ -155,6 +157,16 @@ func TestValidate(t *testing.T) {
value: "invalid",
expected: fmt.Errorf("invalid proxy strategies: unknown proxy strategy: invalid"),
},
"ZeroXfrChannelSize": {
field: "XfrChannelSize",
value: 0,
expected: fmt.Errorf("channel size 0 must be greater than 0"),
},
"NegativeXfrChannelSize": {
field: "XfrChannelSize",
value: -10,
expected: fmt.Errorf("channel size -10 must be greater than 0"),
},
} {
t.Run(desc, func(t *testing.T) {
testServerOptions := NewProxyRunOptions()
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt)
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.XfrChannelSize)

frontendStop, err := p.runFrontendServer(ctx, o, p.server)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
)

const dialTimeout = 5 * time.Second
const xfrChannelSize = 150

// endpointConn tracks a connection from agent to node network.
type endpointConn struct {
Expand All @@ -68,7 +67,7 @@ func (e *endpointConn) send(msg []byte) {
klog.InfoS("Recovered from attempt to write to closed channel")
}
}()
if e.warnChLim && len(e.dataCh) >= xfrChannelSize {
if e.warnChLim && len(e.dataCh) >= cap(e.dataCh) {
klog.V(2).InfoS("Data channel on agent is full", "connectionID", e.connID)
}

Expand Down Expand Up @@ -377,7 +376,7 @@ func (a *Client) Serve() {
dialResp.GetDialResponse().Random = dialReq.Random

connID := atomic.AddInt64(&a.nextConnID, 1)
dataCh := make(chan []byte, xfrChannelSize)
dataCh := make(chan []byte, a.cs.xfrChannelSize)
dialDone := make(chan struct{})
eConn := &endpointConn{
dataCh: dataCh,
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type ClientSet struct {
// by the server when choosing agent

warnOnChannelLimit bool
xfrChannelSize int

syncForever bool // Continue syncing (support dynamic server count).
}
Expand Down Expand Up @@ -141,6 +142,7 @@ type ClientSetConfig struct {
ServiceAccountTokenPath string
WarnOnChannelLimit bool
SyncForever bool
XfrChannelSize int
}

func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
Expand All @@ -157,6 +159,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
warnOnChannelLimit: cc.WarnOnChannelLimit,
syncForever: cc.SyncForever,
drainCh: drainCh,
xfrChannelSize: cc.XfrChannelSize,
stopCh: stopCh,
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ import (
"sigs.k8s.io/apiserver-network-proxy/proto/header"
)

const xfrChannelSize = 10

type key int

type GrpcFrontend struct {
Expand Down Expand Up @@ -218,6 +216,7 @@ type ProxyServer struct {

// TODO: move strategies into BackendStorage
proxyStrategies []ProxyStrategy
xfrChannelSize int
}

// AgentTokenAuthenticationOptions contains list of parameters required for agent token based authentication
Expand Down Expand Up @@ -376,7 +375,7 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien
}

// NewProxyServer creates a new ProxyServer instance
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer {
var bms []BackendManager
for _, ps := range proxyStrategies {
switch ps {
Expand All @@ -401,6 +400,7 @@ func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCoun
// use the first backend-manager as the Readiness Manager
Readiness: bms[0],
proxyStrategies: proxyStrategies,
xfrChannelSize: channelSize,
}
}

Expand All @@ -417,7 +417,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
streamUID := uuid.New().String()
klog.V(5).InfoS("Proxy request from client", "userAgent", userAgent, "serverID", s.serverID, "streamUID", streamUID)

recvCh := make(chan *client.Packet, xfrChannelSize)
recvCh := make(chan *client.Packet, s.xfrChannelSize)
stopCh := make(chan error, 1)

frontend := GrpcFrontend{
Expand Down Expand Up @@ -745,7 +745,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
s.addBackend(backend)
defer s.removeBackend(backend)

recvCh := make(chan *client.Packet, xfrChannelSize)
recvCh := make(chan *client.Packet, s.xfrChannelSize)

go runpprof.Do(context.Background(), labels, func(context.Context) { s.serveRecvBackend(backend, agentID, recvCh) })

Expand Down
Loading

0 comments on commit fc590c6

Please sign in to comment.