diff --git a/gnmi_server/server.go b/gnmi_server/server.go index 9e7774a8..10f087d8 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -52,6 +52,7 @@ type Config struct { UserAuth AuthTypes EnableTranslibWrite bool EnableNativeWrite bool + ZmqAddress string IdleConnDuration int } @@ -343,7 +344,7 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe return nil, err } if check := IsNativeOrigin(origin); check { - dc, err = sdc.NewMixedDbClient(paths, prefix, origin) + dc, err = sdc.NewMixedDbClient(paths, prefix, origin, s.config.ZmqAddress) } else { dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions) } @@ -410,7 +411,7 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe common_utils.IncCounter(common_utils.GNMI_SET_FAIL) return nil, grpc.Errorf(codes.Unimplemented, "GNMI native write is disabled") } - dc, err = sdc.NewMixedDbClient(paths, prefix, origin) + dc, err = sdc.NewMixedDbClient(paths, prefix, origin, s.config.ZmqAddress) } else { if s.config.EnableTranslibWrite == false { common_utils.IncCounter(common_utils.GNMI_SET_FAIL) @@ -485,7 +486,7 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest var supportedModels []gnmipb.ModelData dc, _ := sdc.NewTranslClient(nil, nil, ctx, extensions) supportedModels = append(supportedModels, dc.Capabilities()...) - dc, _ = sdc.NewMixedDbClient(nil, nil, "") + dc, _ = sdc.NewMixedDbClient(nil, nil, "", s.config.ZmqAddress) supportedModels = append(supportedModels, dc.Capabilities()...) suppModels := make([]*gnmipb.ModelData, len(supportedModels)) diff --git a/sonic_data_client/client_test.go b/sonic_data_client/client_test.go index e763cc41..320938b3 100644 --- a/sonic_data_client/client_test.go +++ b/sonic_data_client/client_test.go @@ -5,11 +5,14 @@ import ( "errors" "testing" "os" + "time" "reflect" "io/ioutil" "encoding/json" + "fmt" "github.com/jipanyang/gnxi/utils/xpath" + "github.com/sonic-net/sonic-gnmi/swsscommon" gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) @@ -349,3 +352,94 @@ func TestNonDbClientGetError(t *testing.T) { t.Errorf("Expected error from NonDbClient.Get, got nil") } } + +/* + Helper method for receive data from ZmqConsumerStateTable + consumer: Receive data from consumer + return: + true: data received + false: not receive any data after retry +*/ +func ReceiveFromZmq(consumer swsscommon.ZmqConsumerStateTable) (bool) { + receivedData := swsscommon.NewKeyOpFieldsValuesQueue() + retry := 0; + for { + // sender's ZMQ may disconnect, wait and retry for reconnect + time.Sleep(time.Duration(1000) * time.Millisecond) + consumer.Pops(receivedData) + if receivedData.Size() == 0 { + retry++ + if retry >= 10 { + return false + } + } else { + return true + } + } +} + +func TestZmqReconnect(t *testing.T) { + // create ZMQ server + db := swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false) + zmqServer := swsscommon.NewZmqServer("tcp://*:1234") + var TEST_TABLE string = "DASH_ROUTE" + consumer := swsscommon.NewZmqConsumerStateTable(db, TEST_TABLE, zmqServer) + + // create ZMQ client side + zmqAddress := "tcp://127.0.0.1:1234" + client := MixedDbClient { + applDB : swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false), + tableMap : map[string]swsscommon.ProducerStateTable{}, + zmqClient : swsscommon.NewZmqClient(zmqAddress), + } + + data := map[string]string{} + var TEST_KEY string = "TestKey" + client.DbSetTable(TEST_TABLE, TEST_KEY, data) + if !ReceiveFromZmq(consumer) { + t.Errorf("Receive data from ZMQ failed") + } + + // recreate ZMQ server to trigger re-connect + swsscommon.DeleteZmqConsumerStateTable(consumer) + swsscommon.DeleteZmqServer(zmqServer) + zmqServer = swsscommon.NewZmqServer("tcp://*:1234") + consumer = swsscommon.NewZmqConsumerStateTable(db, TEST_TABLE, zmqServer) + + // send data again, client will reconnect + client.DbSetTable(TEST_TABLE, TEST_KEY, data) + if !ReceiveFromZmq(consumer) { + t.Errorf("Receive data from ZMQ failed") + } +} + +func TestRetryHelper(t *testing.T) { + // create ZMQ server + zmqServer := swsscommon.NewZmqServer("tcp://*:2234") + + // create ZMQ client side + zmqAddress := "tcp://127.0.0.1:2234" + zmqClient := swsscommon.NewZmqClient(zmqAddress) + returnError := true + exeCount := 0 + RetryHelper( + zmqClient, + func () (err error) { + exeCount++ + if returnError { + returnError = false + return fmt.Errorf("connection_reset") + } + return nil + }) + + if exeCount == 1 { + t.Errorf("RetryHelper does not retry") + } + + if exeCount > 2 { + t.Errorf("RetryHelper retry too much") + } + + swsscommon.DeleteZmqServer(zmqServer) +} diff --git a/sonic_data_client/mixed_db_client.go b/sonic_data_client/mixed_db_client.go index 5d53fb96..084cb78b 100644 --- a/sonic_data_client/mixed_db_client.go +++ b/sonic_data_client/mixed_db_client.go @@ -30,7 +30,12 @@ import ( const REDIS_SOCK string = "/var/run/redis/redis.sock" const APPL_DB int = 0 +const APPL_DB_NAME string = "APPL_DB" +const DASH_TABLE_PREFIX string = "DASH_" const SWSS_TIMEOUT uint = 0 +const MAX_RETRY_COUNT uint = 5 +const RETRY_DELAY_MILLISECOND uint = 100 +const RETRY_DELAY_FACTOR uint = 2 const CHECK_POINT_PATH string = "/etc/sonic" const ( @@ -59,6 +64,7 @@ type MixedDbClient struct { workPath string jClient *JsonClient applDB swsscommon.DBConnector + zmqClient swsscommon.ZmqClient tableMap map[string]swsscommon.ProducerStateTable synced sync.WaitGroup // Control when to send gNMI sync_response @@ -66,6 +72,29 @@ type MixedDbClient struct { mu sync.RWMutex // Mutex for data protection among routines for DbClient } +var mixedDbClientMap = map[string]MixedDbClient{} + +func getMixedDbClient(zmqAddress string) (MixedDbClient) { + client, ok := mixedDbClientMap[zmqAddress] + if !ok { + client = MixedDbClient { + applDB : swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false), + tableMap : map[string]swsscommon.ProducerStateTable{}, + } + + // enable ZMQ by zmqAddress parameter + if zmqAddress != "" { + client.zmqClient = swsscommon.NewZmqClient(zmqAddress) + } else { + client.zmqClient = nil + } + + mixedDbClientMap[zmqAddress] = client + } + + return client +} + func parseJson(str []byte) (interface{}, error) { var res interface{} err := json.Unmarshal(str, &res) @@ -98,12 +127,70 @@ func ParseTarget(target string, paths []*gnmipb.Path) (string, error) { return target, nil } -func (c *MixedDbClient) DbSetTable(table string, key string, values map[string]string) error { +func (c *MixedDbClient) GetTable(table string) (swsscommon.ProducerStateTable) { pt, ok := c.tableMap[table] if !ok { - pt = swsscommon.NewProducerStateTable(c.applDB, table) + if strings.HasPrefix(table, DASH_TABLE_PREFIX) && c.zmqClient != nil { + log.V(2).Infof("Create ZmqProducerStateTable: %s", table) + pt = swsscommon.NewZmqProducerStateTable(c.applDB, table, c.zmqClient) + } else { + log.V(2).Infof("Create ProducerStateTable: %s", table) + pt = swsscommon.NewProducerStateTable(c.applDB, table) + } + c.tableMap[table] = pt } + + return pt +} + +func CatchException(err *error) { + if r := recover(); r != nil { + *err = fmt.Errorf("%v", r) + } +} + +func ProducerStateTableSetWrapper(pt swsscommon.ProducerStateTable, key string, value swsscommon.FieldValuePairs) (err error) { + // convert panic to error + defer CatchException(&err) + pt.Set(key, value, "SET", "") + return +} + +func ProducerStateTableDeleteWrapper(pt swsscommon.ProducerStateTable, key string) (err error) { + // convert panic to error + defer CatchException(&err) + pt.Delete(key, "DEL", "") + return +} + +type ActionNeedRetry func() error + +func RetryHelper(zmqClient swsscommon.ZmqClient, action ActionNeedRetry) { + var retry uint = 0 + var retry_delay = time.Duration(RETRY_DELAY_MILLISECOND) * time.Millisecond + ConnectionResetErr := "connection_reset" + for { + err := action() + if err != nil { + if (err.Error() == ConnectionResetErr && retry <= MAX_RETRY_COUNT) { + log.V(6).Infof("RetryHelper: connection reset, reconnect and retry later") + time.Sleep(retry_delay) + + zmqClient.Connect() + retry_delay *= time.Duration(RETRY_DELAY_FACTOR) + retry++ + continue + } + + panic(err) + } + + return + } +} + +func (c *MixedDbClient) DbSetTable(table string, key string, values map[string]string) error { vec := swsscommon.NewFieldValuePairs() defer swsscommon.DeleteFieldValuePairs(vec) for k, v := range values { @@ -111,22 +198,28 @@ func (c *MixedDbClient) DbSetTable(table string, key string, values map[string]s vec.Add(pair) swsscommon.DeleteFieldValuePair(pair) } - pt.Set(key, vec, "SET", "") + + pt := c.GetTable(table) + RetryHelper( + c.zmqClient, + func () error { + return ProducerStateTableSetWrapper(pt, key, vec) + }) return nil } func (c *MixedDbClient) DbDelTable(table string, key string) error { - pt, ok := c.tableMap[table] - if !ok { - pt = swsscommon.NewProducerStateTable(c.applDB, table) - c.tableMap[table] = pt - } - pt.Delete(key, "DEL", "") + pt := c.GetTable(table) + RetryHelper( + c.zmqClient, + func () error { + return ProducerStateTableDeleteWrapper(pt, key) + }) + return nil } -func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string) (Client, error) { - var client MixedDbClient +func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, zmqAddress string) (Client, error) { var err error // Testing program may ask to use redis local tcp connection @@ -134,6 +227,7 @@ func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string) useRedisTcpClient() } + var client = getMixedDbClient(zmqAddress) client.prefix = prefix client.target = "" client.origin = origin @@ -159,8 +253,6 @@ func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string) } client.paths = paths client.workPath = common_utils.GNMI_WORK_PATH - client.applDB = swsscommon.NewDBConnector(APPL_DB, REDIS_SOCK, SWSS_TIMEOUT) - client.tableMap = map[string]swsscommon.ProducerStateTable{} return &client, nil } @@ -1059,10 +1151,7 @@ func (c *MixedDbClient) Capabilities() []gnmipb.ModelData { } func (c *MixedDbClient) Close() error { - for _, pt := range c.tableMap { - swsscommon.DeleteProducerStateTable(pt) - } - swsscommon.DeleteDBConnector(c.applDB) + // Do nothing here, because MixedDbClient will be cache in mixedDbClientMap and reuse return nil } @@ -1070,5 +1159,4 @@ func (c *MixedDbClient) SentOne(val *Value) { } func (c *MixedDbClient) FailedSend() { -} - +} \ No newline at end of file diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index e15cf779..caf9d965 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -25,6 +25,7 @@ var ( caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.") serverCert = flag.String("server_crt", "", "TLS server certificate") serverKey = flag.String("server_key", "", "TLS server private key") + zmqAddress = flag.String("zmq_address", "", "Orchagent ZMQ address, when not set or empty string telemetry server will switch to Redis based communication channel.") insecure = flag.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!") noTLS = flag.Bool("noTLS", false, "disable TLS, for testing only!") allowNoClientCert = flag.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate.") @@ -81,6 +82,7 @@ func main() { cfg.EnableTranslibWrite = bool(*gnmi_translib_write) cfg.EnableNativeWrite = bool(*gnmi_native_write) cfg.LogLevel = 3 + cfg.ZmqAddress = *zmqAddress cfg.Threshold = int(*threshold) cfg.IdleConnDuration = int(*idle_conn_duration) var opts []grpc.ServerOption