Skip to content

Commit

Permalink
[Mellanox] Enable CMIS host management (sonic-net#16846)
Browse files Browse the repository at this point in the history
- Why I did it
Enable CMIS host management for Mellanox devices which are expected to support the feature

- How I did it
new thread in a new file and changing logic in platform code in chassis.py which is calling this thread from get_change_event()
this thread in the new file handles the state machine per port.
first the static detection takes place once the thread is up (during switch bootup sequence), until final decision if it's FW control or SW control module.
After it ends, the dynamic detection takes place, listening to changes in the sysfs fds, per port,
so it will be able to detect plug in or out events of a cable.

- How to verify it
Enhanced unit tests
run sonic mgmt on Nvidia SN4700 with CMIS host management enabled
  • Loading branch information
dbarashinvd authored and keboliu committed Dec 19, 2023
1 parent 4f8a323 commit e9ac1fa
Show file tree
Hide file tree
Showing 4 changed files with 783 additions and 37 deletions.
59 changes: 35 additions & 24 deletions platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
from . import utils
from .device_data import DeviceDataManager
import re
import queue
import threading
import time
from sonic_platform import modules_mgmt
except ImportError as e:
raise ImportError (str(e) + "- required module not found")

Expand Down Expand Up @@ -124,6 +127,10 @@ def __init__(self):
self._RJ45_port_inited = False
self._RJ45_port_list = None

self.modules_mgmt_thread = threading.Thread()
self.modules_changes_queue = queue.Queue()
self.modules_mgmt_task_stopping_event = threading.Event()

logger.log_info("Chassis loaded successfully")

def __del__(self):
Expand Down Expand Up @@ -371,7 +378,7 @@ def get_change_event(self, timeout=0):
Returns:
(bool, dict):
- True if call successful, False if not;
- True if call successful, False if not; - Deprecated, will always return True
- A nested dictionary where key is a device type,
value is a dictionary with key:value pairs in the format of
{'device_id':'device_event'},
Expand All @@ -383,38 +390,42 @@ def get_change_event(self, timeout=0):
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.modules_mgmt_thread.is_alive():
# open new SFP change events thread
self.modules_mgmt_thread = modules_mgmt.ModulesMgmtTask(q=self.modules_changes_queue
, main_thread_stop_event = self.modules_mgmt_task_stopping_event)
# Set the thread as daemon so when pmon/xcvrd are shutting down, modules_mgmt will shut down immedietly.
self.modules_mgmt_thread.daemon = True
self.modules_mgmt_thread.start()
self.initialize_sfp()
# Initialize SFP event first
if not self.sfp_event:
from .sfp_event import sfp_event
self.sfp_event = sfp_event(self.RJ45_port_list)
self.sfp_event.initialize()

wait_for_ever = (timeout == 0)
# select timeout should be no more than 1000ms to ensure fast shutdown flow
select_timeout = 1000.0 if timeout >= 1000 else float(timeout)
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
timeout = 1000.0 if timeout >= 1000 else float(timeout)
port_dict = {}
error_dict = {}
begin = time.time()
i = 0
while True:
status = self.sfp_event.check_sfp_status(port_dict, error_dict, select_timeout)
if bool(port_dict):
break

if not wait_for_ever:
elapse = time.time() - begin
if elapse * 1000 > timeout:
break
try:
logger.log_info(f'get_change_event() trying to get changes from queue on iteration {i}')
port_dict = self.modules_changes_queue.get(timeout=timeout / 1000)
logger.log_info(f'get_change_event() iteration {i} port_dict: {port_dict}')
except queue.Empty:
logger.log_info(f"failed to get item from modules changes queue on itertaion {i}")

if status:
if port_dict:
self.reinit_sfps(port_dict)
result_dict = {'sfp': port_dict}
if error_dict:
result_dict = {'sfp': port_dict}
result_dict['sfp_error'] = error_dict
return True, result_dict
else:
return True, {'sfp': {}}
return True, result_dict
else:
if not wait_for_ever:
elapse = time.time() - begin
logger.log_info(f"get_change_event: wait_for_ever {wait_for_ever} elapse {elapse} iteartion {i}")
if elapse * 1000 >= timeout:
logger.log_info(f"elapse {elapse} > timeout {timeout} iteartion {i} returning empty dict")
return True, {'sfp': {}}
i += 1

def reinit_sfps(self, port_dict):
"""
Expand All @@ -426,7 +437,7 @@ def reinit_sfps(self, port_dict):
for index, status in port_dict.items():
if status == sfp.SFP_STATUS_INSERTED:
try:
self._sfp_list[index - 1].reinit()
self._sfp_list[int(index) - 1].reinit()
except Exception as e:
logger.log_error("Fail to re-initialize SFP {} - {}".format(index, repr(e)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def is_psu_hotswapable(cls):
@classmethod
@utils.read_only_cache()
def get_sfp_count(cls):
return utils.read_int_from_file('/run/hw-management/config/sfp_counter')
sfp_count = utils.read_int_from_file('/run/hw-management/config/sfp_counter')
return sfp_count if sfp_count > 0 else len(glob.glob('/sys/module/sx_core/asic0/module*'))

@classmethod
def get_linecard_sfp_count(cls, lc_index):
Expand Down
Loading

0 comments on commit e9ac1fa

Please sign in to comment.