From e2528cbb2cc1d642d1a174aa0bf04f8c91163b3e Mon Sep 17 00:00:00 2001 From: Dayou Liu <113053330+dayouliu1@users.noreply.github.com> Date: Wed, 16 Oct 2024 19:39:19 -0700 Subject: [PATCH] convert bgp stress link flap test to use asyncio (#14890) * convert bgp stress link flap test to use asyncio Fix test_bgp_stress_link_flap.py deadlocks by converting test to use asyncio instead of multi-threading. This will also resolve memory issues on VM environments caused by the test spawning a large number of threads * Fixed bgp stress link flap logging Fixed test_bgp_stress_link_flap.py logging to accurately record number of flaps for different interfaces. --- tests/bgp/test_bgp_stress_link_flap.py | 268 ++++++++----------------- 1 file changed, 84 insertions(+), 184 deletions(-) diff --git a/tests/bgp/test_bgp_stress_link_flap.py b/tests/bgp/test_bgp_stress_link_flap.py index e45619f9215..becbbd6973b 100644 --- a/tests/bgp/test_bgp_stress_link_flap.py +++ b/tests/bgp/test_bgp_stress_link_flap.py @@ -1,12 +1,10 @@ +import asyncio import logging import pytest import time -import traceback -import threading from tests.common.platform.device_utils import fanout_switch_port_lookup from tests.common.helpers.assertions import pytest_assert from tests.common.utilities import wait_until -from tests.common.utilities import InterruptableThread logger = logging.getLogger(__name__) @@ -15,7 +13,7 @@ pytest.mark.topology('t0', 't1') ] -stop_threads = False +stop_tasks = False SLEEP_DURATION = 0.005 TEST_RUN_DURATION = 300 MEMORY_EXHAUST_THRESHOLD = 300 @@ -117,28 +115,26 @@ def setup(duthosts, rand_one_dut_hostname, nbrhosts, fanouthosts): "Not all BGP sessions are established on DUT") -def flap_dut_interface(duthost, port, stop_event, sleep_duration, test_run_duration): +async def flap_dut_interface(duthost, port, sleep_duration, test_run_duration): logger.info("flap dut {} interface {} delay time {} timeout {}".format( duthost, port, sleep_duration, test_run_duration)) global dut_flap_count - dut_flap_count = 0 start_time = time.time() # Record the start time - while not stop_event.is_set() and time.time() - start_time < test_run_duration: + while not stop_tasks and time.time() - start_time < test_run_duration: duthost.shutdown(port) - time.sleep(sleep_duration) + await asyncio.sleep(sleep_duration) duthost.no_shutdown(port) - time.sleep(sleep_duration) - if stop_threads: - logger.info("Stop flap thread, breaking dut flap dut {} interface {} flap count {}".format( + await asyncio.sleep(sleep_duration) + dut_flap_count += 1 + if stop_tasks: + logger.info("Stop flap task, breaking dut flap dut {} interface {} flap count {}".format( duthost, port, dut_flap_count)) break - dut_flap_count += 1 -def flap_fanout_interface_all(interface_list, fanouthosts, duthost, stop_event, sleep_duration, test_run_duration): +async def flap_fanout_interface_all(interface_list, fanouthosts, duthost, sleep_duration, test_run_duration): global fanout_flap_count - fanout_flap_count = 0 fanout_interfaces = {} for port in interface_list: @@ -151,75 +147,71 @@ def flap_fanout_interface_all(interface_list, fanouthosts, duthost, stop_event, logger.info("flap interface fanout port {}".format(fanout_interfaces)) start_time = time.time() # Record the start time - while not stop_event.is_set() and time.time() - start_time < test_run_duration: + while not stop_tasks and time.time() - start_time < test_run_duration: for fanout_host, fanout_ports in fanout_interfaces.items(): logger.info("flap interface fanout {} port {}".format(fanout_host, fanout_port)) - fanout_host.shutdown_multiple(fanout_ports) - time.sleep(sleep_duration) + await asyncio.sleep(sleep_duration) fanout_host.no_shutdown_multiple(fanout_ports) - time.sleep(sleep_duration) + await asyncio.sleep(sleep_duration) fanout_flap_count += 1 - if stop_threads: - logger.info("Stop flap thread, breaking flap fanout {} dut {} flap count {}".format( + if stop_tasks: + logger.info("Stop flap task, breaking flap fanout {} dut {} flap count {}".format( fanouthosts, duthost, fanout_flap_count)) break -def flap_fanout_interface(interface_list, fanouthosts, duthost, stop_event, sleep_duration, test_run_duration): +async def flap_fanout_interface(interface_list, fanouthosts, duthost, sleep_duration, test_run_duration): global fanout_flap_count - fanout_flap_count = 0 start_time = time.time() # Record the start time - while not stop_event.is_set() and time.time() - start_time < test_run_duration: + while not stop_tasks and time.time() - start_time < test_run_duration: for port in interface_list: - if stop_threads: - break - fanout, fanout_port = fanout_switch_port_lookup(fanouthosts, duthost.hostname, port) if fanout and fanout_port: logger.info("flap interface fanout {} port {}".format(fanout, fanout_port)) fanout.shutdown(fanout_port) - time.sleep(sleep_duration) + await asyncio.sleep(sleep_duration) fanout.no_shutdown(fanout_port) - time.sleep(sleep_duration) + await asyncio.sleep(sleep_duration) else: logger.warning("fanout not found for {} port {}".format(duthost.hostname, port)) + if stop_tasks: + break + fanout_flap_count += 1 - if stop_threads: - logger.info("Stop flap thread, breaking flap fanout {} dut {} interface {} flap count {}".format( + if stop_tasks: + logger.info("Stop flap task, breaking flap fanout {} dut {} interface {} flap count {}".format( fanouthosts, duthost, port, fanout_flap_count)) break -def flap_neighbor_interface(neighbor, neighbor_port, stop_event, sleep_duration, test_run_duration): +async def flap_neighbor_interface(neighbor, neighbor_port, sleep_duration, test_run_duration): logger.info("flap neighbor {} interface {}".format(neighbor, neighbor_port)) global neighbor_flap_count - neighbor_flap_count = 0 start_time = time.time() # Record the start time - while not stop_event.is_set() and time.time() - start_time < test_run_duration: + while not stop_tasks and time.time() - start_time < test_run_duration: neighbor.shutdown(neighbor_port) - time.sleep(sleep_duration) + await asyncio.sleep(sleep_duration) neighbor.no_shutdown(neighbor_port) - time.sleep(sleep_duration) - if stop_threads: - logger.info("Stop flap thread, breaking flap neighbor {} interface {} flap count {}".format( + await asyncio.sleep(sleep_duration) + neighbor_flap_count += 1 + if stop_tasks: + logger.info("Stop flap task, breaking flap neighbor {} interface {} flap count {}".format( neighbor, neighbor_port, neighbor_flap_count)) break - neighbor_flap_count += 1 @pytest.mark.parametrize("test_type", ["dut", "fanout", "neighbor", "all"]) def test_bgp_stress_link_flap(duthosts, rand_one_dut_hostname, setup, nbrhosts, fanouthosts, test_type, get_function_completeness_level): - global stop_threads + global stop_tasks global dut_flap_count global fanout_flap_count global neighbor_flap_count - global TEST_RUN_DURATION duthost = duthosts[rand_one_dut_hostname] @@ -243,152 +235,60 @@ def test_bgp_stress_link_flap(duthosts, rand_one_dut_hostname, setup, nbrhosts, interface_list = eth_nbrs.keys() logger.debug('interface_list: {}'.format(interface_list)) - stop_threads = False + stop_tasks = False dut_flap_count = 0 fanout_flap_count = 0 neighbor_flap_count = 0 - # Create a stop event - stop_event = threading.Event() - - flap_threads = [] - - if test_type == "dut": - for interface in interface_list: - thread = InterruptableThread( - target=flap_dut_interface, - args=(duthost, interface, stop_event, delay_time, TEST_RUN_DURATION,) - ) - thread.daemon = True - thread.start() - logger.info("Start flap thread {} dut {} interface {}".format(thread, duthost, interface)) - flap_threads.append(thread) - # create only one thread for vs due to memory resource limitation - if asic_type == "vs": - break - elif test_type == "fanout": - thread = InterruptableThread( - target=flap_fanout_interface, - args=(interface_list, fanouthosts, duthost, stop_event, delay_time, TEST_RUN_DURATION,) - ) - thread.daemon = True - thread.start() - logger.info("Start flap thread {} fanout {} dut {}".format(thread, fanouthosts, duthost)) - flap_threads.append(thread) - elif test_type == "neighbor": - for interface in interface_list: - neighbor_name = eth_nbrs[interface]["name"] - neighbor_port = eth_nbrs[interface]["port"] - neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None) - if neighbor_host: - thread = InterruptableThread( - target=flap_neighbor_interface, - args=(neighbor_host, neighbor_port, stop_event, delay_time, TEST_RUN_DURATION,) - ) - thread.daemon = True - thread.start() - logger.info("Start flap thread {} neighbor {} port {}".format(thread, neighbor_host, neighbor_port)) - flap_threads.append(thread) - else: - logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port)) - # create only one thread for vs due to memory resource limitation - if asic_type == "vs": - break - elif test_type == "all": - for interface in interface_list: - logger.info("shutdown all interface {} ".format(interface)) - thread_dut = InterruptableThread( - target=flap_dut_interface, - args=(duthost, interface, stop_event, delay_time, TEST_RUN_DURATION,) - ) - thread_dut.daemon = True - thread_dut.start() - logger.info("Start flap thread {} dut {} interface {}".format(thread_dut, duthost, interface)) - flap_threads.append(thread_dut) - - neighbor_name = eth_nbrs[interface]["name"] - neighbor_port = eth_nbrs[interface]["port"] - neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None) - if neighbor_host: - thread_neighbor = InterruptableThread( - target=flap_neighbor_interface, - args=(neighbor_host, neighbor_port, stop_event, delay_time, TEST_RUN_DURATION,) - ) - thread_neighbor.daemon = True - thread_neighbor.start() - logger.info("Start flap thread {} neighbor {} port {}".format( - thread_neighbor, neighbor_host, neighbor_port)) - flap_threads.append(thread_neighbor) - else: - logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port)) - - thread_fanout = InterruptableThread( - target=flap_fanout_interface, - args=(interface_list, fanouthosts, duthost, stop_event, delay_time, TEST_RUN_DURATION,) - ) - thread_fanout.daemon = True - thread_fanout.start() - logger.info("Start flap thread {} fanout {} dut {} ".format( - thread_fanout, fanouthosts, duthost)) - flap_threads.append(thread_fanout) - - logger.info("flap_threads {} ".format(flap_threads)) - - avail_mem_list = [] - end_time = time.time() + TEST_RUN_DURATION - while time.time() < end_time: - time.sleep(30) - - cmd = "free -m" - cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None) - logger.info("cmd {} response: {}".format(cmd, cmd_out)) - lines = cmd_out.split('\n') - for line in lines: - if line.startswith("Mem:"): - fields = line.split() - total_mem, avail_mem = int(fields[1]), int(fields[-1]) - logger.info("Total memory {} Available memory: {}".format(total_mem, avail_mem)) - avail_mem_list.append(avail_mem) - break - - if avail_mem < MEMORY_EXHAUST_THRESHOLD: - logger.error("Available memory {} is less than {}, stopping the test".format( - avail_mem, MEMORY_EXHAUST_THRESHOLD)) - - cmd = "top -b -n 1" - cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None) - logger.info("cmd {} response: {}".format(cmd, cmd_out)) - - cmd = "sudo monit status" - cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None) - logger.info("cmd {} response: {}".format(cmd, cmd_out)) - - cmd = "docker stats --no-stream" - cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None) - logger.info("cmd {} response: {}".format(cmd, cmd_out)) - - break - - logger.info("Test running for {} seconds".format(time.time() + TEST_RUN_DURATION - end_time)) - logger.info("Test run duration dut_flap_count {} fanout_flap_count {} neighbor_flap_count {}".format( - dut_flap_count, fanout_flap_count, neighbor_flap_count)) - stop_event.set() - stop_threads = True - logger.info("stop_threads {} ".format(flap_threads)) - time.sleep(30) - - for thread in flap_threads: - logger.info("waiting thread {} done".format(thread)) - try: - if thread.is_alive(): - thread.join(timeout=30) - logger.info("thread {} joined".format(thread)) - except Exception as e: - logger.debug("Exception occurred in thread %r:", thread) - logger.debug("".join(traceback.format_exception(None, e, e.__traceback__))) - - # Clean up the thread list after joining all threads - logger.info("clear threads {} ".format(flap_threads)) - flap_threads.clear() - logger.debug("avail_mem history {} ".format(avail_mem_list)) + def check_test_type(match_type): + return test_type in [match_type, "all"] + + async def flap_interfaces(): + flap_tasks = [] + if check_test_type("dut"): + for interface in interface_list: + task = asyncio.create_task( + flap_dut_interface(duthost, interface, delay_time, TEST_RUN_DURATION)) + logger.info("Start flap dut {} interface {}".format(duthost, interface)) + flap_tasks.append(task) + + if check_test_type("neighbor"): + for interface in interface_list: + neighbor_name = eth_nbrs[interface]["name"] + neighbor_port = eth_nbrs[interface]["port"] + neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None) + if neighbor_host: + task = asyncio.create_task( + flap_neighbor_interface(neighbor_host, neighbor_port, delay_time, TEST_RUN_DURATION)) + logger.info("Start flap neighbor {} port {}".format(neighbor_host, neighbor_port)) + flap_tasks.append(task) + else: + logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port)) + + if check_test_type("fanout"): + task = asyncio.create_task( + flap_fanout_interface(interface_list, fanouthosts, duthost, delay_time, TEST_RUN_DURATION)) + logger.info("Start flap fanout {} dut {} ".format(fanouthosts, duthost)) + flap_tasks.append(task) + + logger.info("flap_tasks {} ".format(flap_tasks)) + start_time = time.time() + + await asyncio.sleep(TEST_RUN_DURATION) + + global stop_tasks + stop_tasks = True + logger.info("stop_tasks {} ".format(flap_tasks)) + + await asyncio.gather(*flap_tasks) + + logger.info("Test running for {} seconds".format(time.time() - start_time)) + logger.info("Test run duration dut_flap_count {} fanout_flap_count {} neighbor_flap_count {}".format( + dut_flap_count, fanout_flap_count, neighbor_flap_count)) + + # Clean up the task list after joining all tasks + logger.info("clear tasks {} ".format(flap_tasks)) + flap_tasks.clear() + + asyncio.run(flap_interfaces()) return