Skip to content

Commit

Permalink
Restart caclmgrd whenever catch exception in child thread or in main …
Browse files Browse the repository at this point in the history
…thread

Signed-off-by: Zhaohui Sun <zhaohuisun@microsoft.com>
  • Loading branch information
ZhaohuiS committed Dec 12, 2024
1 parent b0b3ca5 commit e86e25a
Showing 1 changed file with 121 additions and 85 deletions.
206 changes: 121 additions & 85 deletions scripts/caclmgrd
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ try:
import sys
import threading
import time
import traceback
import signal
from queue import Empty, Queue
from sonic_py_common.general import getstatusoutput_noshell_pipe
from sonic_py_common import daemon_base, device_info, multi_asic
from swsscommon import swsscommon
Expand Down Expand Up @@ -129,6 +132,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
self.update_thread = {}
self.lock = {}
self.num_changes = {}
self.stop_event = threading.Event()

# Initialize update-thread-specific data for default namespace
self.update_thread[DEFAULT_NAMESPACE] = None
Expand Down Expand Up @@ -707,7 +711,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
table_ip_version = 4

# Read DST_PORT info from Config DB, insert it back to ACL_SERVICES
if acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT" in rule_props:
if acl_service == 'EXTERNAL_CLIENT':# and "L4_DST_PORT" in rule_props:
dst_ports = [rule_props["L4_DST_PORT"]]
self.ACL_SERVICES[acl_service]["dst_ports"] = dst_ports
elif acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT_RANGE" in rule_props:
Expand Down Expand Up @@ -852,7 +856,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
self.run_commands(dualtor_iptables_cmds)


def check_and_update_control_plane_acls(self, namespace, num_changes):
def check_and_update_control_plane_acls(self, namespace, num_changes, exception_queue):
"""
This function is intended to be spawned in a separate thread.
Its purpose is to prevent unnecessary iptables updates if we receive
Expand All @@ -869,6 +873,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
while True:
# Sleep for our delay interval
time.sleep(self.UPDATE_DELAY_SECS)
self.log_info("After delay {}s, checking for ACL table changes in namespace '{}'".format(self.UPDATE_DELAY_SECS, namespace))

with self.lock[namespace]:
if self.num_changes[namespace] > num_changes:
Expand All @@ -890,6 +895,18 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
self.num_changes[namespace] = 0
self.update_thread[namespace] = None
return
except Exception as e:
self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e)))
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = traceback.format_exception(exc_type, exc_value, exc_traceback)
for tb_line in msg:
for tb_line_split in tb_line.splitlines():
self.log_error(tb_line_split)
#self.stop_event.set()
exc_info = traceback.format_exc()
exception_queue.put((namespace, repr(e), exc_info)) # Add the exception to the queue
self.log_error("Exiting thread {}, put it into exception_queue {}".format(
threading.current_thread().getName(), exception_queue))
finally:
new_config_db_connector.close("CONFIG_DB")

Expand Down Expand Up @@ -988,6 +1005,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# set up state_db connector
state_db_connector = swsscommon.DBConnector("STATE_DB", 0)
config_db_connector = swsscommon.DBConnector("CONFIG_DB", 0)
exception_queue = Queue()

if self.DualToR:
self.log_info("Dual ToR mode")
Expand Down Expand Up @@ -1031,103 +1049,121 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):

# Get the ACL rule table seprator
acl_rule_table_seprator = subscribe_acl_rule_table.getTableNameSeparator()
try:
# Loop on select to see if any event happen on state db or config db of any namespace
while True:
# Periodically check for exceptions from child threads
try:
namespace, error, _ = exception_queue.get_nowait() # Non-blocking
self.log_error(f"Exception in namespace '{namespace}': {error}")
self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName()))
os.kill(os.getpid(), signal.SIGKILL)
except Empty:
# No exceptions in the queue
pass
(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue

# Loop on select to see if any event happen on state db or config db of any namespace
while True:
(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue

# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)

# Get the corresponding namespace and db_id from redisselect
namespace = redisSelectObj.getDbConnector().getNamespace()
db_id = redisSelectObj.getDbConnector().getDbId()

if db_id == state_db_id:
while True:
key, op, fvs = subscribe_bfd_session.pop()
if not key:
break
# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)

if op == 'SET' and not self.bfdAllowed:
self.allow_bfd_protocol(namespace)
self.bfdAllowed = True
sel.removeSelectable(subscribe_bfd_session)
# Get the corresponding namespace and db_id from redisselect
namespace = redisSelectObj.getDbConnector().getNamespace()
db_id = redisSelectObj.getDbConnector().getDbId()

if self.DualToR:
'''dhcp packet mark update'''
if db_id == state_db_id:
while True:
key, op, fvs = subscribe_dhcp_packet_mark.pop()
key, op, fvs = subscribe_bfd_session.pop()
if not key:
break
self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs)))

'''initial value is None'''
pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
cur_mark = None if op == 'DEL' else dict(fvs)['mark']
dhcp_packet_mark_tbl[key] = cur_mark
self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark)
if op == 'SET' and not self.bfdAllowed:
self.allow_bfd_protocol(namespace)
self.bfdAllowed = True
sel.removeSelectable(subscribe_bfd_session)

if self.DualToR:
'''dhcp packet mark update'''
while True:
key, op, fvs = subscribe_dhcp_packet_mark.pop()
if not key:
break
self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs)))

'''initial value is None'''
pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
cur_mark = None if op == 'DEL' else dict(fvs)['mark']
dhcp_packet_mark_tbl[key] = cur_mark
self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark)

'''mux cable update'''
while True:
key, op, fvs = subscribe_mux_cable.pop()
if not key:
break
self.log_info("mux cable update : '%s'" % str((key, op, fvs)))

mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
self.update_dhcp_acl(key, op, dict(fvs), mark)
continue

'''mux cable update'''
if db_id == config_db_id:
while True:
key, op, fvs = subscribe_mux_cable.pop()
key, op, fvs = subscribe_vxlan_table.pop()
if not key:
break
self.log_info("mux cable update : '%s'" % str((key, op, fvs)))
if op == 'SET' and not self.VxlanAllowed:
self.allow_vxlan_port(namespace, fvs)
elif op == 'DEL' and self.VxlanAllowed:
self.block_vxlan_port(namespace)

mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
self.update_dhcp_acl(key, op, dict(fvs), mark)
continue
ctrl_plane_acl_notification = set()

if db_id == config_db_id:
while True:
key, op, fvs = subscribe_vxlan_table.pop()
if not key:
break
if op == 'SET' and not self.VxlanAllowed:
self.allow_vxlan_port(namespace, fvs)
elif op == 'DEL' and self.VxlanAllowed:
self.block_vxlan_port(namespace)

ctrl_plane_acl_notification = set()

# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
(key, op, fvp) = table.pop()
# Pop of table that does not have data so break
if key == '':
break
# ACL Table notification. We will take Control Plane ACTION for any ACL Table Event
# This can be optimize further but we should not have many acl table set/del events in normal
# scenario
if acl_rule_table_seprator not in key:
ctrl_plane_acl_notification.add(namespace)
# Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane
else:
acl_table = key.split(acl_rule_table_seprator)[0]
if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE:
# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
(key, op, fvp) = table.pop()
# Pop of table that does not have data so break
if key == '':
break
# ACL Table notification. We will take Control Plane ACTION for any ACL Table Event
# This can be optimize further but we should not have many acl table set/del events in normal
# scenario
if acl_rule_table_seprator not in key:
ctrl_plane_acl_notification.add(namespace)

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace]))
self.update_thread[namespace].start()
# Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane
else:
acl_table = key.split(acl_rule_table_seprator)[0]
if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE:
ctrl_plane_acl_notification.add(namespace)

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace], exception_queue))
self.update_thread[namespace].start()
except Exception as e:
self.log_error("Exception occured at main thread due to {}".format(repr(e)))
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = traceback.format_exception(exc_type, exc_value, exc_traceback)
for tb_line in msg:
for tb_line_split in tb_line.splitlines():
self.log_error(tb_line_split)
self.log_error("Catch exception in main thread, generating SIGKILL for main thread")
os.kill(os.getpid(), signal.SIGKILL)

# ============================= Functions =============================

Expand Down

0 comments on commit e86e25a

Please sign in to comment.