Skip to content

Commit

Permalink
Merge pull request #135 from liuh-80/dev/liuh/cherry-pick-zmq
Browse files Browse the repository at this point in the history
Integrate ZMQ to GNMI to improve Dash GNMI API performance. 

#### Why I did it
Currently Dash gnmi to orchagent communitcation using redis based channel, switch to ZMQ based change will improve performance. 

#### How I did it
Integrate ZMQ to gnmi service.

#### How to verify it
Manually test.
Add new UT.

#### Work item tracking
Microsoft ADO (number only): 17753835

#### Which release branch to backport (provide reason below if selected)

<!--
- Note we only backport fixes to a release branch, *not* features!
- Please also provide a reason for the backporting below.
- e.g.
- [x] 202006
-->

- [ ] 201811
- [ ] 201911
- [ ] 202006
- [ ] 202012
- [ ] 202106
- [ ] 202111

#### Description for the changelog
Integrate ZMQ to GNMI to improve Dash GNMI API performance. 

#### Link to config_db schema for YANG module changes
<!--
Provide a link to config_db schema for the table for which YANG model
is defined
Link should point to correct section on https://github.com/Azure/SONiC/wiki/Configuration.
-->

#### A picture of a cute animal (not mandatory but encouraged)
  • Loading branch information
liuh-80 authored Jul 21, 2023
2 parents 610509b + f8d9c7e commit fb338d5
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 22 deletions.
7 changes: 4 additions & 3 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Config struct {
UserAuth AuthTypes
EnableTranslibWrite bool
EnableNativeWrite bool
ZmqAddress string
IdleConnDuration int
}

Expand Down Expand Up @@ -343,7 +344,7 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe
return nil, err
}
if check := IsNativeOrigin(origin); check {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, s.config.ZmqAddress)
} else {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions)
}
Expand Down Expand Up @@ -410,7 +411,7 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
return nil, grpc.Errorf(codes.Unimplemented, "GNMI native write is disabled")
}
dc, err = sdc.NewMixedDbClient(paths, prefix, origin)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, s.config.ZmqAddress)
} else {
if s.config.EnableTranslibWrite == false {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
Expand Down Expand Up @@ -485,7 +486,7 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest
var supportedModels []gnmipb.ModelData
dc, _ := sdc.NewTranslClient(nil, nil, ctx, extensions)
supportedModels = append(supportedModels, dc.Capabilities()...)
dc, _ = sdc.NewMixedDbClient(nil, nil, "")
dc, _ = sdc.NewMixedDbClient(nil, nil, "", s.config.ZmqAddress)
supportedModels = append(supportedModels, dc.Capabilities()...)

suppModels := make([]*gnmipb.ModelData, len(supportedModels))
Expand Down
94 changes: 94 additions & 0 deletions sonic_data_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"errors"
"testing"
"os"
"time"
"reflect"
"io/ioutil"
"encoding/json"
"fmt"

"github.com/jipanyang/gnxi/utils/xpath"
"github.com/sonic-net/sonic-gnmi/swsscommon"
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
)

Expand Down Expand Up @@ -349,3 +352,94 @@ func TestNonDbClientGetError(t *testing.T) {
t.Errorf("Expected error from NonDbClient.Get, got nil")
}
}

/*
Helper method for receive data from ZmqConsumerStateTable
consumer: Receive data from consumer
return:
true: data received
false: not receive any data after retry
*/
func ReceiveFromZmq(consumer swsscommon.ZmqConsumerStateTable) (bool) {
receivedData := swsscommon.NewKeyOpFieldsValuesQueue()
retry := 0;
for {
// sender's ZMQ may disconnect, wait and retry for reconnect
time.Sleep(time.Duration(1000) * time.Millisecond)
consumer.Pops(receivedData)
if receivedData.Size() == 0 {
retry++
if retry >= 10 {
return false
}
} else {
return true
}
}
}

func TestZmqReconnect(t *testing.T) {
// create ZMQ server
db := swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false)
zmqServer := swsscommon.NewZmqServer("tcp://*:1234")
var TEST_TABLE string = "DASH_ROUTE"
consumer := swsscommon.NewZmqConsumerStateTable(db, TEST_TABLE, zmqServer)

// create ZMQ client side
zmqAddress := "tcp://127.0.0.1:1234"
client := MixedDbClient {
applDB : swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false),
tableMap : map[string]swsscommon.ProducerStateTable{},
zmqClient : swsscommon.NewZmqClient(zmqAddress),
}

data := map[string]string{}
var TEST_KEY string = "TestKey"
client.DbSetTable(TEST_TABLE, TEST_KEY, data)
if !ReceiveFromZmq(consumer) {
t.Errorf("Receive data from ZMQ failed")
}

// recreate ZMQ server to trigger re-connect
swsscommon.DeleteZmqConsumerStateTable(consumer)
swsscommon.DeleteZmqServer(zmqServer)
zmqServer = swsscommon.NewZmqServer("tcp://*:1234")
consumer = swsscommon.NewZmqConsumerStateTable(db, TEST_TABLE, zmqServer)

// send data again, client will reconnect
client.DbSetTable(TEST_TABLE, TEST_KEY, data)
if !ReceiveFromZmq(consumer) {
t.Errorf("Receive data from ZMQ failed")
}
}

func TestRetryHelper(t *testing.T) {
// create ZMQ server
zmqServer := swsscommon.NewZmqServer("tcp://*:2234")

// create ZMQ client side
zmqAddress := "tcp://127.0.0.1:2234"
zmqClient := swsscommon.NewZmqClient(zmqAddress)
returnError := true
exeCount := 0
RetryHelper(
zmqClient,
func () (err error) {
exeCount++
if returnError {
returnError = false
return fmt.Errorf("connection_reset")
}
return nil
})

if exeCount == 1 {
t.Errorf("RetryHelper does not retry")
}

if exeCount > 2 {
t.Errorf("RetryHelper retry too much")
}

swsscommon.DeleteZmqServer(zmqServer)
}
126 changes: 107 additions & 19 deletions sonic_data_client/mixed_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ import (

const REDIS_SOCK string = "/var/run/redis/redis.sock"
const APPL_DB int = 0
const APPL_DB_NAME string = "APPL_DB"
const DASH_TABLE_PREFIX string = "DASH_"
const SWSS_TIMEOUT uint = 0
const MAX_RETRY_COUNT uint = 5
const RETRY_DELAY_MILLISECOND uint = 100
const RETRY_DELAY_FACTOR uint = 2
const CHECK_POINT_PATH string = "/etc/sonic"

const (
Expand Down Expand Up @@ -59,13 +64,37 @@ type MixedDbClient struct {
workPath string
jClient *JsonClient
applDB swsscommon.DBConnector
zmqClient swsscommon.ZmqClient
tableMap map[string]swsscommon.ProducerStateTable

synced sync.WaitGroup // Control when to send gNMI sync_response
w *sync.WaitGroup // wait for all sub go routines to finish
mu sync.RWMutex // Mutex for data protection among routines for DbClient
}

var mixedDbClientMap = map[string]MixedDbClient{}

func getMixedDbClient(zmqAddress string) (MixedDbClient) {
client, ok := mixedDbClientMap[zmqAddress]
if !ok {
client = MixedDbClient {
applDB : swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false),
tableMap : map[string]swsscommon.ProducerStateTable{},
}

// enable ZMQ by zmqAddress parameter
if zmqAddress != "" {
client.zmqClient = swsscommon.NewZmqClient(zmqAddress)
} else {
client.zmqClient = nil
}

mixedDbClientMap[zmqAddress] = client
}

return client
}

func parseJson(str []byte) (interface{}, error) {
var res interface{}
err := json.Unmarshal(str, &res)
Expand Down Expand Up @@ -98,42 +127,107 @@ func ParseTarget(target string, paths []*gnmipb.Path) (string, error) {
return target, nil
}

func (c *MixedDbClient) DbSetTable(table string, key string, values map[string]string) error {
func (c *MixedDbClient) GetTable(table string) (swsscommon.ProducerStateTable) {
pt, ok := c.tableMap[table]
if !ok {
pt = swsscommon.NewProducerStateTable(c.applDB, table)
if strings.HasPrefix(table, DASH_TABLE_PREFIX) && c.zmqClient != nil {
log.V(2).Infof("Create ZmqProducerStateTable: %s", table)
pt = swsscommon.NewZmqProducerStateTable(c.applDB, table, c.zmqClient)
} else {
log.V(2).Infof("Create ProducerStateTable: %s", table)
pt = swsscommon.NewProducerStateTable(c.applDB, table)
}

c.tableMap[table] = pt
}

return pt
}

func CatchException(err *error) {
if r := recover(); r != nil {
*err = fmt.Errorf("%v", r)
}
}

func ProducerStateTableSetWrapper(pt swsscommon.ProducerStateTable, key string, value swsscommon.FieldValuePairs) (err error) {
// convert panic to error
defer CatchException(&err)
pt.Set(key, value, "SET", "")
return
}

func ProducerStateTableDeleteWrapper(pt swsscommon.ProducerStateTable, key string) (err error) {
// convert panic to error
defer CatchException(&err)
pt.Delete(key, "DEL", "")
return
}

type ActionNeedRetry func() error

func RetryHelper(zmqClient swsscommon.ZmqClient, action ActionNeedRetry) {
var retry uint = 0
var retry_delay = time.Duration(RETRY_DELAY_MILLISECOND) * time.Millisecond
ConnectionResetErr := "connection_reset"
for {
err := action()
if err != nil {
if (err.Error() == ConnectionResetErr && retry <= MAX_RETRY_COUNT) {
log.V(6).Infof("RetryHelper: connection reset, reconnect and retry later")
time.Sleep(retry_delay)

zmqClient.Connect()
retry_delay *= time.Duration(RETRY_DELAY_FACTOR)
retry++
continue
}

panic(err)
}

return
}
}

func (c *MixedDbClient) DbSetTable(table string, key string, values map[string]string) error {
vec := swsscommon.NewFieldValuePairs()
defer swsscommon.DeleteFieldValuePairs(vec)
for k, v := range values {
pair := swsscommon.NewFieldValuePair(k, v)
vec.Add(pair)
swsscommon.DeleteFieldValuePair(pair)
}
pt.Set(key, vec, "SET", "")

pt := c.GetTable(table)
RetryHelper(
c.zmqClient,
func () error {
return ProducerStateTableSetWrapper(pt, key, vec)
})
return nil
}

func (c *MixedDbClient) DbDelTable(table string, key string) error {
pt, ok := c.tableMap[table]
if !ok {
pt = swsscommon.NewProducerStateTable(c.applDB, table)
c.tableMap[table] = pt
}
pt.Delete(key, "DEL", "")
pt := c.GetTable(table)
RetryHelper(
c.zmqClient,
func () error {
return ProducerStateTableDeleteWrapper(pt, key)
})

return nil
}

func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string) (Client, error) {
var client MixedDbClient
func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, zmqAddress string) (Client, error) {
var err error

// Testing program may ask to use redis local tcp connection
if UseRedisLocalTcpPort {
useRedisTcpClient()
}

var client = getMixedDbClient(zmqAddress)
client.prefix = prefix
client.target = ""
client.origin = origin
Expand All @@ -159,8 +253,6 @@ func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string)
}
client.paths = paths
client.workPath = common_utils.GNMI_WORK_PATH
client.applDB = swsscommon.NewDBConnector(APPL_DB, REDIS_SOCK, SWSS_TIMEOUT)
client.tableMap = map[string]swsscommon.ProducerStateTable{}

return &client, nil
}
Expand Down Expand Up @@ -1059,16 +1151,12 @@ func (c *MixedDbClient) Capabilities() []gnmipb.ModelData {
}

func (c *MixedDbClient) Close() error {
for _, pt := range c.tableMap {
swsscommon.DeleteProducerStateTable(pt)
}
swsscommon.DeleteDBConnector(c.applDB)
// Do nothing here, because MixedDbClient will be cache in mixedDbClientMap and reuse
return nil
}

func (c *MixedDbClient) SentOne(val *Value) {
}

func (c *MixedDbClient) FailedSend() {
}

}
2 changes: 2 additions & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.")
serverCert = flag.String("server_crt", "", "TLS server certificate")
serverKey = flag.String("server_key", "", "TLS server private key")
zmqAddress = flag.String("zmq_address", "", "Orchagent ZMQ address, when not set or empty string telemetry server will switch to Redis based communication channel.")
insecure = flag.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!")
noTLS = flag.Bool("noTLS", false, "disable TLS, for testing only!")
allowNoClientCert = flag.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate.")
Expand Down Expand Up @@ -81,6 +82,7 @@ func main() {
cfg.EnableTranslibWrite = bool(*gnmi_translib_write)
cfg.EnableNativeWrite = bool(*gnmi_native_write)
cfg.LogLevel = 3
cfg.ZmqAddress = *zmqAddress
cfg.Threshold = int(*threshold)
cfg.IdleConnDuration = int(*idle_conn_duration)
var opts []grpc.ServerOption
Expand Down

0 comments on commit fb338d5

Please sign in to comment.