From 923958f41fda5103ba5392a08c442a7963c325b0 Mon Sep 17 00:00:00 2001 From: vdahiya12 <67608553+vdahiya12@users.noreply.github.com> Date: Wed, 17 Jul 2024 10:27:58 -0700 Subject: [PATCH] [ycabled][active-active] Fix in gRPC channel callback logic by creating swsscommon table within the context (#509) * [ycabled][active-active] Fix in gRPC channel callback logic by creating swsscommon table within the context Signed-off-by: Vaibhav Dahiya * fix UT Signed-off-by: Vaibhav Dahiya * add more tests Signed-off-by: Vaibhav Dahiya * typo Signed-off-by: Vaibhav Dahiya * add port Signed-off-by: Vaibhav Dahiya * add logging Signed-off-by: Vaibhav Dahiya * add tests Signed-off-by: Vaibhav Dahiya --------- Signed-off-by: Vaibhav Dahiya --- sonic-ycabled/tests/test_y_cable_helper.py | 56 ++++--- sonic-ycabled/ycable/ycable.py | 2 +- .../ycable/ycable_utilities/y_cable_helper.py | 141 ++++++++++-------- .../ycable_utilities/y_cable_table_helper.py | 27 +++- 4 files changed, 143 insertions(+), 83 deletions(-) diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index bdb631957..169351060 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -1754,7 +1754,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) + logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stop_event=threading.Event()) assert(rc == None) @@ -1787,7 +1787,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) + logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stop_event=threading.Event()) assert(rc == None) @@ -1831,7 +1831,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) + logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stop_event=threading.Event()) assert(rc == None) @@ -1878,7 +1878,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) + logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stop_event=threading.Event()) assert(rc == None) @@ -1918,7 +1918,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence,port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) + logical_port_dict, y_cable_presence,port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stop_event=threading.Event()) assert(rc == None) @@ -6496,7 +6496,7 @@ def test_retry_setup_grpc_channel_for_port_incorrect(self): port_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "PORT_INFO_TABLE") grpc_client , fwd_state_response_tbl = {}, {} - rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client, fwd_state_response_tbl) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client) assert(rc == False) @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None,None))) @@ -6514,7 +6514,7 @@ def test_retry_setup_grpc_channel_for_port_correct_none_val(self): port_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "PORT_INFO_TABLE") grpc_client , fwd_state_response_tbl = {}, {} - rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client, fwd_state_response_tbl) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client) assert(rc == False) def test_process_loopback_interface_and_get_read_side_rc(self): @@ -6532,7 +6532,7 @@ def test_process_loopback_interface_and_get_read_side_rc(self): port_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "PORT_INFO_TABLE") grpc_client , fwd_state_response_tbl = {}, {} - rc = retry_setup_grpc_channel_for_port("Ethernet0", 0 , port_tbl, grpc_client, fwd_state_response_tbl ) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0 , port_tbl, grpc_client) assert(rc == False) @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(True,True))) @@ -6550,7 +6550,7 @@ def test_retry_setup_grpc_channel_for_port_correct(self): port_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "PORT_INFO_TABLE") grpc_client , fwd_state_response_tbl = {}, {} - rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client, fwd_state_response_tbl) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client) assert(rc == True) @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None,None))) @@ -6568,7 +6568,7 @@ def test_retry_setup_grpc_channel_for_port_correct_none_val(self): port_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "PORT_INFO_TABLE") grpc_client , fwd_state_response_tbl = {}, {} - rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client, fwd_state_response_tbl) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0, port_tbl, grpc_client) assert(rc == False) def test_process_loopback_interface_and_get_read_side_rc(self): @@ -6619,7 +6619,7 @@ def test_check_identifier_presence_and_setup_channel(self): mux_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "MUX_INFO_TABLE") - rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl) + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client) assert(rc == None) @@ -6655,7 +6655,7 @@ def test_check_identifier_presence_and_setup_channel_with_false_status(self): mux_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "MUX_INFO_TABLE") - rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl) + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client) assert(rc == None) @@ -6692,7 +6692,7 @@ def test_check_identifier_presence_and_setup_channel_with_mock(self): mux_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "MUX_INFO_TABLE") - rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl) + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client) assert(rc == None) @@ -6730,7 +6730,7 @@ def test_check_identifier_presence_and_setup_channel_with_mock_not_none(self): mux_tbl[asic_index] = swsscommon.Table( test_db[asic_index], "MUX_INFO_TABLE") - rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl) + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client) assert(rc == None) @@ -6754,7 +6754,7 @@ def test_setup_grpc_channel_for_port(self): with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util: patched_util.get_asic_id_for_logical_port.return_value = 0 - (channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1", asic_index, grpc_client, fwd_state_response_tbl, False) + (channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1", asic_index, grpc_client, False) assert(stub == True) assert(channel != None) @@ -6779,7 +6779,7 @@ def test_setup_grpc_channel_for_port_get_false(self): with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util: patched_util.get_asic_id_for_logical_port.return_value = 0 - (channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1", asic_index, grpc_client, fwd_state_response_tbl, False) + (channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1", asic_index, grpc_client, False) assert(stub == True) assert(channel != None) @@ -6803,7 +6803,7 @@ def test_setup_grpc_channels(self): patched_util.logical.return_value = ['Ethernet0', 'Ethernet4'] patched_util.get_asic_id_for_logical_port.return_value = 0 loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client, fwd_state_response_tbl = {}, {}, {}, {}, {}, {}, {}, {} - rc = setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client, fwd_state_response_tbl) + rc = setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client) assert(rc == None) @@ -7764,3 +7764,25 @@ def test_ycable_helper_table_worker_toggle_active_active(self, mock_select, mock assert swsscommon.Select.select.call_count == 1 + def test_ycable_wait_for_state_change(self): + + channel_conn = grpc.ChannelConnectivity.TRANSIENT_FAILURE + port = 'Ethernet0' + rc = wait_for_state_change(channel_conn, port) + + assert (rc == None) + + channel_conn = grpc.ChannelConnectivity.CONNECTING + rc = wait_for_state_change(channel_conn, port) + + assert (rc == None) + + channel_conn = grpc.ChannelConnectivity.READY + rc = wait_for_state_change(channel_conn, port) + + assert (rc == None) + + channel_conn = grpc.ChannelConnectivity.SHUTDOWN + rc = wait_for_state_change(channel_conn, port) + + assert (rc == None) diff --git a/sonic-ycabled/ycable/ycable.py b/sonic-ycabled/ycable/ycable.py index 0b6ad0406..f430f1a42 100644 --- a/sonic-ycabled/ycable/ycable.py +++ b/sonic-ycabled/ycable/ycable.py @@ -113,7 +113,7 @@ def handle_state_update_task(op, port, fvp_dict, y_cable_presence, port_tbl, por port_dict[port] = SFP_STATUS_REMOVED y_cable_helper.change_ports_status_for_y_cable_change_event( - port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event) + port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stopping_event) # # Helper classes =============================================================== diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index 562246802..f734864d2 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -346,7 +346,7 @@ def wrapper(*args, **kwargs): return wrapper -def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, fwd_state_response_tbl): +def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client): global grpc_port_stubs global grpc_port_channels @@ -366,7 +366,7 @@ def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, f if soc_ipv4_full is not None: soc_ipv4 = soc_ipv4_full.split('/')[0] - channel, stub = setup_grpc_channel_for_port(port, soc_ipv4, asic_index, grpc_client, fwd_state_response_tbl, False) + channel, stub = setup_grpc_channel_for_port(port, soc_ipv4, asic_index, grpc_client, False) if channel is None or stub is None: helper_logger.log_notice( "stub is None, while reattempt setting up channels did not work {}".format(port)) @@ -458,34 +458,48 @@ def connect_channel(channel, stub, port): else: break -def create_channel(type_chan, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async): +def wait_for_state_change(channel_connectivity, port): + # Initialize YcableChannelStateTableHelper only once + if not hasattr(wait_for_state_change, 'table_helper'): + wait_for_state_change.table_helper = {} - # Helper callback to get an channel connectivity state - def wait_for_state_change(channel_connectivity): - if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE: - helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port)) - # for connectivity state to FAILURE/IDLE report a failure - fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')]) - fwd_state_response_tbl[asic_index].set(port, fvs_updated) - grpc_port_connectivity[port] = "TRANSIENT_FAILURE" - - if channel_connectivity == grpc.ChannelConnectivity.CONNECTING: - helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port)) - grpc_port_connectivity[port] = "CONNECTING" - if channel_connectivity == grpc.ChannelConnectivity.READY: - helper_logger.log_notice("gRPC port {} state changed to READY".format(port)) - grpc_port_connectivity[port] = "READY" - if channel_connectivity == grpc.ChannelConnectivity.IDLE: - helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port)) - # for connectivity state to FAILURE/IDLE report a failure - fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')]) - fwd_state_response_tbl[asic_index].set(port, fvs_updated) - grpc_port_connectivity[port] = "IDLE" - - if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN: - helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port)) - grpc_port_connectivity[port] = "SHUTDOWN" + if wait_for_state_change.table_helper.get(port, None) is None: + wait_for_state_change.table_helper[port] = y_cable_table_helper.YcableChannelStateTableHelper() + + # Use the initialized table_helper + table_helper = wait_for_state_change.table_helper[port] + + # get the appropriate table_helper + fwd_state_response_tbl = table_helper.get_fwd_state_response_tbl() + asic_index = multi_asic.get_asic_index_from_namespace(DEFAULT_NAMESPACE) + + if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE: + helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE tid {}".format(port, threading.currentThread().getName())) + # for connectivity state to FAILURE/IDLE report a failure + fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')]) + fwd_state_response_tbl[asic_index].set(port, fvs_updated) + grpc_port_connectivity[port] = "TRANSIENT_FAILURE" + if channel_connectivity == grpc.ChannelConnectivity.CONNECTING: + helper_logger.log_notice("gRPC port {} state changed to CONNECTING tid {}".format(port, threading.currentThread().getName())) + grpc_port_connectivity[port] = "CONNECTING" + if channel_connectivity == grpc.ChannelConnectivity.READY: + helper_logger.log_notice("gRPC port {} state changed to READY tid {}".format(port, threading.currentThread().getName())) + grpc_port_connectivity[port] = "READY" + if channel_connectivity == grpc.ChannelConnectivity.IDLE: + helper_logger.log_notice("gRPC port {} state changed to IDLE tid {}".format(port, threading.currentThread().getName())) + # for connectivity state to FAILURE/IDLE report a failure + fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')]) + fwd_state_response_tbl[asic_index].set(port, fvs_updated) + grpc_port_connectivity[port] = "IDLE" + + if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN: + helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN tid {}".format(port, threading.currentThread().getName())) + grpc_port_connectivity[port] = "SHUTDOWN" + + + +def create_channel(type_chan, level, kvp, soc_ip, port, asic_index, is_async): if type_chan == "secure": credential = get_grpc_credentials(level, kvp) @@ -517,7 +531,8 @@ def wait_for_state_change(channel_connectivity): if not is_async and channel is not None: - channel.subscribe(wait_for_state_change) + channel.subscribe(lambda channel_connectivity: wait_for_state_change(channel_connectivity, port)) + #connect_channel(channel, stub, port) """ @@ -528,7 +543,7 @@ def wait_for_state_change(channel_connectivity): return channel, stub -def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, fwd_state_response_tbl, is_async): +def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, is_async): """ Dummy values for lab for now @@ -570,7 +585,7 @@ def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, fwd_state kvp = dict(fvs) - channel, stub = create_channel(type_chan, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async) + channel, stub = create_channel(type_chan, level, kvp, soc_ip, port, asic_index, is_async) if stub is None: helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) @@ -585,7 +600,7 @@ def put_init_values_for_grpc_states(port, read_side, hw_mux_cable_tbl, hw_mux_ca stub = grpc_port_stubs.get(port, None) request = linkmgr_grpc_driver_pb2.AdminRequest(portid=DEFAULT_PORT_IDS, state=[0, 0]) if stub is None: - helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {}".format(port)) + helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {} tid {} writing unknown".format(port, threading.currentThread().getName())) fvs_updated = swsscommon.FieldValuePairs([('state', 'unknown'), ('read_side', str(read_side)), ('active_side', 'unknown')]) @@ -600,9 +615,9 @@ def put_init_values_for_grpc_states(port, read_side, hw_mux_cable_tbl, hw_mux_ca fwd_response_port_ids = response.portid fwd_response_port_ids_state = response.state helper_logger.log_notice( - "forwarding state RPC received response port ids = {} port {}".format(fwd_response_port_ids, port)) + "initial forwarding state RPC received response port ids = {} port {} tid {}".format(fwd_response_port_ids, port, threading.currentThread().getName())) helper_logger.log_notice( - "forwarding state RPC received response state values = {} port {}".format(fwd_response_port_ids_state, port)) + "initial forwarding state RPC received response state values = {} port {} tid {}".format(fwd_response_port_ids_state, port, threading.currentThread().getName())) else: helper_logger.log_warning("response was none while doing init config state for gRPC HW_MUX_CABLE_TABLE {} ".format(port)) @@ -641,7 +656,7 @@ def process_loopback_interface_and_get_read_side(loopback_keys): return -1 -def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl): +def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client): global grpc_port_stubs global grpc_port_channels @@ -678,7 +693,7 @@ def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_ if prev_channel is not None and prev_stub is not None: return - channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, grpc_client, fwd_state_response_tbl, False) + channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, grpc_client, False) post_port_mux_info_to_db(logical_port_name, mux_tbl, asic_index, hw_mux_cable_tbl, 'pseudo-cable') if channel is not None: grpc_port_channels[logical_port_name] = channel @@ -703,7 +718,7 @@ def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_ "DAC cable logical to physical port mapping returned more than one physical ports while Channel setup Port {}".format(logical_port_name)) -def setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client, fwd_state_response_tbl): +def setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client): global read_side helper_logger.log_debug("Y_CABLE_DEBUG:setting up channels for active-active") @@ -753,7 +768,7 @@ def setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cabl if logical_port_name in port_table_keys[asic_index]: check_identifier_presence_and_setup_channel( - logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl) + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client) else: # This port does not exist in Port table of config but is present inside # logical_ports after loading the port_mappings from port_config_file @@ -1381,7 +1396,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen if status and cable_type == "active-active": grpc_port_stats[logical_port_name] = {} check_identifier_presence_and_setup_channel( - logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl) + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client) else: # This port does not exist in Port table of config but is present inside # logical_ports after loading the port_mappings from port_config_file @@ -1390,7 +1405,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen "Could not retreive port inside config_db PORT table {} for Y-Cable initiation".format(logical_port_name)) -def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()): +def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stop_event=threading.Event()): global read_side delete_change_event = [False] @@ -1422,7 +1437,7 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, po state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) if status and cable_type == "active-active": check_identifier_presence_and_setup_channel( - logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl) + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client) elif value == SFP_STATUS_REMOVED: helper_logger.log_info("Got SFP deleted ycable event") check_identifier_presence_and_delete_mux_table_entry( @@ -3283,7 +3298,7 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_ # TODO state only for dummy value in this request MSG remove this request = linkmgr_grpc_driver_pb2.AdminRequest(portid=DEFAULT_PORT_IDS, state=[0, 0]) helper_logger.log_debug( - "Y_CABLE_DEBUG:calling RPC for getting cli forwarding state read_side portid = {} Ethernet port {}".format(read_side, port)) + "Y_CABLE_DEBUG:calling RPC for getting cli forwarding state read_side portid = {} Ethernet port {} tid {}".format(read_side, port, threading.currentThread().getName())) stub = grpc_port_stubs.get(port, None) if stub is None: @@ -3301,10 +3316,10 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_ # Debug only, remove this section once Server side is Finalized fwd_response_port_ids = response.portid fwd_response_port_ids_state = response.state - helper_logger.log_notice( - "forwarding state RPC received response port ids = {} port {}".format(fwd_response_port_ids, port)) - helper_logger.log_notice( - "forwarding state RPC received response state values = {} port {}".format(fwd_response_port_ids_state, port)) + helper_logger.log_debug( + "procesing request hwmode forwarding state RPC received response port ids = {} port {} tid {}".format(fwd_response_port_ids, port, threading.currentThread().getName())) + helper_logger.log_debug( + "processing request hwmode forwarding state RPC received response state values = {} port {} tid {}".format(fwd_response_port_ids_state, port, threading.currentThread().getName())) else: helper_logger.log_notice("response was none cli handle_fwd_state_command_grpc_notification {} ".format(port)) @@ -3407,15 +3422,15 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side)) # TODO state only for dummy value in this request MSG remove this request = linkmgr_grpc_driver_pb2.AdminRequest(portid=DEFAULT_PORT_IDS, state=[0, 0]) - helper_logger.log_notice( - "calling RPC for getting forwarding state port = {} portid {} peer portid {} read_side {}".format(port, read_side, 1 - int(read_side), read_side)) + helper_logger.log_debug( + "processing request for fwd_state calling RPC for getting forwarding state port = {} portid {} peer portid {} read_side {} tid {}".format(port, read_side, 1 - int(read_side), read_side, threading.currentThread().getName())) self_state = "unknown" peer_state = "unknown" stub = grpc_port_stubs.get(port, None) if stub is None: - helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {}".format(port)) - retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, fwd_state_response_tbl) + helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {} tid ".format(port, threading.currentThread().getName())) + retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client) stub = grpc_port_stubs.get(port, None) if stub is None: helper_logger.log_warning( @@ -3432,10 +3447,10 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat # Debug only, remove this section once Server side is Finalized fwd_response_port_ids = response.portid fwd_response_port_ids_state = response.state - helper_logger.log_notice( - "forwarding state RPC received response port = {} portids {} read_side {}".format(port, fwd_response_port_ids,read_side)) - helper_logger.log_notice( - "forwarding state RPC received response port = {} state values = {} read_side {}".format(port, fwd_response_port_ids_state, read_side)) + helper_logger.log_debug( + "processing request for fwd_state forwarding state RPC received response port = {} portids {} read_side {} tid {}".format(port, fwd_response_port_ids, read_side, threading.currentThread().getName())) + helper_logger.log_debug( + "processing request for fwd state forwarding state RPC received response port = {} state values = {} read_side {} tid {}".format(port, fwd_response_port_ids_state, read_side, threading.currentThread().getName())) else: helper_logger.log_notice("response was none handle_fwd_state_command_grpc_notification {} ".format(port)) @@ -3497,7 +3512,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde stub = grpc_port_stubs.get(port, None) if stub is None: helper_logger.log_debug("Y_CABLE_DEBUG:stub is None for performing hw mux RPC port {}".format(port)) - retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, fwd_state_response_tbl) + retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client) stub = grpc_port_stubs.get(port, None) if stub is None: helper_logger.log_warning( @@ -3521,11 +3536,11 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde hw_response_port_ids = response.portid hw_response_port_ids_state = response.state helper_logger.log_notice( - "Set admin state RPC received response port {} port ids = {} curr_read_side {} read_side {}".format(port, hw_response_port_ids, curr_read_side, read_side)) + "Set admin state RPC received response port {} port ids = {} curr_read_side {} read_side {} tid {}".format(port, hw_response_port_ids, curr_read_side, read_side, threading.currentThread().getName())) helper_logger.log_notice( - "Set admin state RPC received response port {} state values = {} curr_read_side {} read_side {}".format(port, hw_response_port_ids_state, curr_read_side, read_side)) + "Set admin state RPC received response port {} state values = {} curr_read_side {} read_side {} tid {}".format(port, hw_response_port_ids_state, curr_read_side, read_side, threading.currentThread().getName())) else: - helper_logger.log_notice("response was none hw_mux_cable_table_grpc_notification {} ".format(port)) + helper_logger.log_notice("response was none hw_mux_cable_table_grpc_notification {} tid {}".format(port, threading.currentThread().getName())) active_side = parse_grpc_response_hw_mux_cable_change_state(ret, response, curr_read_side, port) @@ -3543,7 +3558,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde ('read_side', read_side), ('active_side', str(active_side))]) hw_mux_cable_tbl[asic_index].set(port, fvs_updated) - helper_logger.log_debug("Y_CABLE_DEBUG: processed the notification hw mux state cleanly {}".format(port)) + helper_logger.log_debug("Y_CABLE_DEBUG: processed the notification hw mux state cleanly {} stubs {} channels {}".format(port, len(grpc_port_stubs), len(grpc_port_channels))) else: helper_logger.log_info("Got a change event on port {} of table {} that does not contain state".format( port, swsscommon.APP_HW_MUX_CABLE_TABLE_NAME)) @@ -4040,7 +4055,7 @@ async def send_request_and_get_response(self): response_stream = self.stub.NotifyGracefulRestartStart(request) index = 0 async for response in response_stream: - helper_logger.log_notice("Async client received from direct read period port = {}: period = {} index = {} guid = {} notifytype {} msgtype = {}".format(self.port, response.period, index, response.guid, response.notifytype, response.msgtype)) + helper_logger.log_debug("Async client received from direct read period port = {}: period = {} index = {} guid = {} notifytype {} msgtype = {} tid {}".format(self.port, response.period, index, response.guid, response.notifytype, response.msgtype, threading.currentThread().getName())) helper_logger.log_debug("Async Debug only :{} {}".format(dir(response_stream), dir(response))) index = index+1 if response == grpc.aio.EOF: @@ -4048,14 +4063,14 @@ async def send_request_and_get_response(self): helper_logger.log_notice("Async client finished loop from direct read period port:{} ".format(self.port)) index = index+1 except grpc.RpcError as e: - helper_logger.log_notice("Async client port = {} exception occured because of {} ".format(self.port, e.code())) + helper_logger.log_notice("Async client port = {} exception occured because of {} tid {}".format(self.port, e.code(), threading.currentThread().getName())) await self.response_queue.put(response) async def process_response(self): while True: response = await self.response_queue.get() - helper_logger.log_debug("Async recieved a response from {} {}".format(self.port, response)) + helper_logger.log_debug("Async recieved a response from {} {} tid {}".format(self.port, response, threading.currentThread().getName())) # do something with response if response is not None: await asyncio.sleep(response.period) @@ -4108,7 +4123,7 @@ async def task_worker(self): if soc_ipv4_full is not None: soc_ipv4 = soc_ipv4_full.split('/')[0] - channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), True) + channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, self.table_helper.get_grpc_config_tbl(), True) client = GracefulRestartClient(logical_port_name, channel, read_side) tasks.append(asyncio.create_task(client.send_request_and_get_response())) diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_table_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_table_helper.py index 285b10019..ec8963594 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_table_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_table_helper.py @@ -3,11 +3,16 @@ helper utlities configuring y_cable tables for ycabled daemon """ - -from sonic_py_common import daemon_base +import threading +from sonic_py_common import daemon_base, logger from sonic_py_common import multi_asic from swsscommon import swsscommon + +SYSLOG_IDENTIFIER = "y_cable_table_helper" + +helper_logger = logger.Logger(SYSLOG_IDENTIFIER) + MUX_CABLE_STATIC_INFO_TABLE = "MUX_CABLE_STATIC_INFO" MUX_CABLE_INFO_TABLE = "MUX_CABLE_INFO" TRANSCEIVER_INFO_TABLE = 'TRANSCEIVER_INFO' @@ -532,3 +537,21 @@ def get_grpc_config_tbl(self): def get_fwd_state_response_tbl(self): return self.fwd_state_response_tbl + +class YcableChannelStateTableHelper(object): + def __init__(self): + + self.appl_db = {} + self.fwd_state_response_tbl = {} + + # Get the namespaces in the platform + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + self.appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace) + self.fwd_state_response_tbl[asic_id] = swsscommon.Table( + self.appl_db[asic_id], "FORWARDING_STATE_RESPONSE") + helper_logger.log_notice('created table instance from tid {}'.format(threading.currentThread().getName())) + + def get_fwd_state_response_tbl(self): + return self.fwd_state_response_tbl