diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index bf791e79..94a4147a 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -161,7 +161,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { if origin == "openconfig" { dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{}) } else if IsNativeOrigin(origin) { - dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "", "") + dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "", "", "") } else if len(origin) != 0 { return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin) } else if target == "" { diff --git a/gnmi_server/server.go b/gnmi_server/server.go index f3ec24ce..1bf17cef 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -83,6 +83,7 @@ type Config struct { EnableTranslibWrite bool EnableNativeWrite bool ZmqPort string + DpuProxyBaseAddr string IdleConnDuration int ConfigTableName string Vrf string @@ -410,7 +411,7 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe } } if check := IsNativeOrigin(origin); check { - dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf) + dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr) } else { dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions) } @@ -508,7 +509,7 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe common_utils.IncCounter(common_utils.GNMI_SET_FAIL) return nil, grpc.Errorf(codes.Unimplemented, "GNMI native write is disabled") } - dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf) + dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr) } else { if s.config.EnableTranslibWrite == false { common_utils.IncCounter(common_utils.GNMI_SET_FAIL) @@ -585,7 +586,7 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest var supportedModels []gnmipb.ModelData dc, _ := sdc.NewTranslClient(nil, nil, ctx, extensions) supportedModels = append(supportedModels, dc.Capabilities()...) - dc, _ = sdc.NewMixedDbClient(nil, nil, "", gnmipb.Encoding_JSON_IETF, s.config.ZmqPort, s.config.Vrf) + dc, _ = sdc.NewMixedDbClient(nil, nil, "", gnmipb.Encoding_JSON_IETF, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr) supportedModels = append(supportedModels, dc.Capabilities()...) suppModels := make([]*gnmipb.ModelData, len(supportedModels)) diff --git a/sonic_data_client/client_test.go b/sonic_data_client/client_test.go index 55cadaf4..07b0e212 100644 --- a/sonic_data_client/client_test.go +++ b/sonic_data_client/client_test.go @@ -756,17 +756,17 @@ func TestGetDpuAddress(t *testing.T) { } // test get ZMQ address - address, err = getZmqAddress("dpu0", "1234") + address, err = getZmqAddress("dpu0", "1234", "") if address != "tcp://127.0.0.2:1234" { t.Errorf("get invalid DPU address failed") } - address, err = getZmqAddress("dpu0", "") + address, err = getZmqAddress("dpu0", "", "") if err == nil { t.Errorf("get invalid ZMQ address failed") } - address, err = getZmqAddress("", "1234") + address, err = getZmqAddress("", "1234", "") if err == nil { t.Errorf("get invalid ZMQ address failed") } @@ -777,6 +777,37 @@ func TestGetDpuAddress(t *testing.T) { swsscommon.DeleteDBConnector(configDb) } +func TestGetDpuProxyAddress(t *testing.T) { + address, err := getDpuProxyAddress("dpu1", "127.0.10.10") + if err != nil { + t.Errorf("get DPU address failed") + } + + if address != "127.0.10.11" { + t.Errorf("invalid DPU address") + } + + address, err = getDpuProxyAddress("dpu_no_index", "127.0.10.10") + if err == nil { + t.Errorf("get with invalid DPU failed") + } + + address, err = getDpuProxyAddress("dpu1", "invalid IP") + if err == nil { + t.Errorf("get with invalid base address failed") + } + + address, err = getDpuProxyAddress("dpu1", "::1") + if err == nil { + t.Errorf("get with IPv6 base address failed") + } + + address, err = getDpuProxyAddress("dpu300", "127.0.10.10") + if err == nil { + t.Errorf("get with dpu index out of range failed") + } +} + func TestGetZmqClient(t *testing.T) { if !swsscommon.SonicDBConfigIsInit() { swsscommon.SonicDBConfigInitialize() @@ -793,17 +824,17 @@ func TestGetZmqClient(t *testing.T) { dpusTable.Hset("dpu0", "midplane_interface", "dpu0") dhcpPortTable.Hset("bridge-midplane|dpu0", "ips@", "127.0.0.2,127.0.0.1") - client, err := getZmqClient("dpu0", "", "") + client, err := getZmqClient("dpu0", "", "", "") if client != nil || err != nil { t.Errorf("empty ZMQ port should not get ZMQ client") } - client, err = getZmqClient("dpu0", "1234", "") + client, err = getZmqClient("dpu0", "1234", "", "") if client == nil { t.Errorf("get ZMQ client failed") } - client, err = getZmqClient("", "1234", "") + client, err = getZmqClient("", "1234", "", "") if client == nil { t.Errorf("get ZMQ client failed") } diff --git a/sonic_data_client/mixed_db_client.go b/sonic_data_client/mixed_db_client.go index 4fe1bb22..e395a75d 100644 --- a/sonic_data_client/mixed_db_client.go +++ b/sonic_data_client/mixed_db_client.go @@ -142,13 +142,47 @@ func getDpuAddress(dpuId string) (string, error) { return dpuAddressArray[0], nil } -func getZmqAddress(container string, zmqPort string) (string, error) { +func getDpuProxyAddress(dpuId string, dpuProxyBaseAddr string) (string, error) { + dpuIndexStr := strings.TrimPrefix(dpuId, "dpu") + dpuIndex, err := strconv.Atoi(dpuIndexStr) + if err != nil { + return "", fmt.Errorf("Failed to parse DPU index from %s: %v", dpuId, err) + } + + baseIp := net.ParseIP(dpuProxyBaseAddr) + if baseIp == nil { + return "", fmt.Errorf("Invalid DPU proxy base address: %s", dpuProxyBaseAddr) + } + + baseIp = baseIp.To4() + if baseIp == nil { + return "", fmt.Errorf("Expecting an IPv4 address for DPU proxy: %s", dpuProxyBaseAddr) + } + + lastOctet := int(baseIp[3]) + dpuIndex + if lastOctet > 255 { + return "", fmt.Errorf("DPU index is out of range") + } + + baseIp[3] = byte(lastOctet) + return baseIp.String(), nil +} + +func getZmqAddress(container string, zmqPort string, dpuProxyBaseAddr string) (string, error) { // when zmqPort empty, ZMQ feature disabled if zmqPort == "" { return "", fmt.Errorf("ZMQ port is empty.") } - var dpuAddress, err = getDpuAddress(container) + var dpuAddress string + var err error + + if dpuProxyBaseAddr != "" { + dpuAddress, err = getDpuProxyAddress(container, dpuProxyBaseAddr) + } else { + dpuAddress, err = getDpuAddress(container) + } + if err != nil { return "", fmt.Errorf("Get DPU address failed: %v", err) } @@ -181,7 +215,7 @@ func removeZmqClient(zmqClient swsscommon.ZmqClient) (error) { return fmt.Errorf("Can't find ZMQ client in zmqClientMap: %v", zmqClient) } -func getZmqClient(dpuId string, zmqPort string, vrf string) (swsscommon.ZmqClient, error) { +func getZmqClient(dpuId string, zmqPort string, vrf string, dpuProxyBaseAddr string) (swsscommon.ZmqClient, error) { if zmqPort == "" { // ZMQ feature disabled when zmqPort flag not set return nil, nil @@ -192,7 +226,7 @@ func getZmqClient(dpuId string, zmqPort string, vrf string) (swsscommon.ZmqClien return getZmqClientByAddress("tcp://" + LOCAL_ADDRESS + ":" + zmqPort, vrf) } - zmqAddress, err := getZmqAddress(dpuId, zmqPort) + zmqAddress, err := getZmqAddress(dpuId, zmqPort, dpuProxyBaseAddr) if err != nil { return nil, fmt.Errorf("Get ZMQ address failed: %v", err) } @@ -493,7 +527,7 @@ func init() { initRedisDbMap() } -func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, encoding gnmipb.Encoding, zmqPort string, vrf string) (Client, error) { +func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, encoding gnmipb.Encoding, zmqPort string, vrf string, dpuProxyBaseAddr string) (Client, error) { var err error // Initialize RedisDbMap for test @@ -556,7 +590,7 @@ func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, client.workPath = common_utils.GNMI_WORK_PATH // continer is DPU ID - client.zmqClient, err = getZmqClient(container, zmqPort, vrf) + client.zmqClient, err = getZmqClient(container, zmqPort, vrf, dpuProxyBaseAddr) if err != nil { return nil, fmt.Errorf("Get ZMQ client failed: %v", err) } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index cb56e10c..a5fac830 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -46,6 +46,7 @@ type TelemetryConfig struct { ConfigTableName *string ZmqAddress *string ZmqPort *string + DashProxyAddr *string Insecure *bool NoTLS *bool AllowNoClientCert *bool @@ -155,6 +156,7 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) { ConfigTableName: fs.String("config_table_name", "", "Config table name"), ZmqAddress: fs.String("zmq_address", "", "Orchagent ZMQ address, deprecated, please use zmq_port."), ZmqPort: fs.String("zmq_port", "", "Orchagent ZMQ port, when not set or empty string telemetry server will switch to Redis based communication channel."), + DashProxyAddr: fs.String("zmq_dpu_proxy_address_base", "", "Dash offload manager ZMQ base address, when set, the DPU configuration will be send to the proxy address instead of directly to the DPU."), Insecure: fs.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!"), NoTLS: fs.Bool("noTLS", false, "disable TLS, for testing only!"), AllowNoClientCert: fs.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate."), @@ -242,6 +244,7 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) { } cfg.ZmqPort = zmqPort + cfg.DpuProxyBaseAddr = *telemetryCfg.DashProxyAddr return telemetryCfg, cfg, nil }