Skip to content

Commit

Permalink
convert bgp stress link flap test to use asyncio (#14890)
Browse files Browse the repository at this point in the history
* convert bgp stress link flap test to use asyncio

Fix test_bgp_stress_link_flap.py deadlocks by converting
test to use asyncio instead of multi-threading. This will also resolve
memory issues on VM environments caused by the test spawning
a large number of threads

* Fixed bgp stress link flap logging

Fixed test_bgp_stress_link_flap.py logging to accurately record number
of flaps for different interfaces.
  • Loading branch information
dayouliu1 authored and lipxu committed Dec 26, 2024
1 parent 1455587 commit e2528cb
Showing 1 changed file with 84 additions and 184 deletions.
268 changes: 84 additions & 184 deletions tests/bgp/test_bgp_stress_link_flap.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import asyncio
import logging
import pytest
import time
import traceback
import threading
from tests.common.platform.device_utils import fanout_switch_port_lookup
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import wait_until
from tests.common.utilities import InterruptableThread

logger = logging.getLogger(__name__)

Expand All @@ -15,7 +13,7 @@
pytest.mark.topology('t0', 't1')
]

stop_threads = False
stop_tasks = False
SLEEP_DURATION = 0.005
TEST_RUN_DURATION = 300
MEMORY_EXHAUST_THRESHOLD = 300
Expand Down Expand Up @@ -117,28 +115,26 @@ def setup(duthosts, rand_one_dut_hostname, nbrhosts, fanouthosts):
"Not all BGP sessions are established on DUT")


def flap_dut_interface(duthost, port, stop_event, sleep_duration, test_run_duration):
async def flap_dut_interface(duthost, port, sleep_duration, test_run_duration):
logger.info("flap dut {} interface {} delay time {} timeout {}".format(
duthost, port, sleep_duration, test_run_duration))
global dut_flap_count
dut_flap_count = 0

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
while not stop_tasks and time.time() - start_time < test_run_duration:
duthost.shutdown(port)
time.sleep(sleep_duration)
await asyncio.sleep(sleep_duration)
duthost.no_shutdown(port)
time.sleep(sleep_duration)
if stop_threads:
logger.info("Stop flap thread, breaking dut flap dut {} interface {} flap count {}".format(
await asyncio.sleep(sleep_duration)
dut_flap_count += 1
if stop_tasks:
logger.info("Stop flap task, breaking dut flap dut {} interface {} flap count {}".format(
duthost, port, dut_flap_count))
break
dut_flap_count += 1


def flap_fanout_interface_all(interface_list, fanouthosts, duthost, stop_event, sleep_duration, test_run_duration):
async def flap_fanout_interface_all(interface_list, fanouthosts, duthost, sleep_duration, test_run_duration):
global fanout_flap_count
fanout_flap_count = 0
fanout_interfaces = {}

for port in interface_list:
Expand All @@ -151,75 +147,71 @@ def flap_fanout_interface_all(interface_list, fanouthosts, duthost, stop_event,
logger.info("flap interface fanout port {}".format(fanout_interfaces))

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
while not stop_tasks and time.time() - start_time < test_run_duration:
for fanout_host, fanout_ports in fanout_interfaces.items():
logger.info("flap interface fanout {} port {}".format(fanout_host, fanout_port))

fanout_host.shutdown_multiple(fanout_ports)
time.sleep(sleep_duration)
await asyncio.sleep(sleep_duration)
fanout_host.no_shutdown_multiple(fanout_ports)
time.sleep(sleep_duration)
await asyncio.sleep(sleep_duration)

fanout_flap_count += 1
if stop_threads:
logger.info("Stop flap thread, breaking flap fanout {} dut {} flap count {}".format(
if stop_tasks:
logger.info("Stop flap task, breaking flap fanout {} dut {} flap count {}".format(
fanouthosts, duthost, fanout_flap_count))
break


def flap_fanout_interface(interface_list, fanouthosts, duthost, stop_event, sleep_duration, test_run_duration):
async def flap_fanout_interface(interface_list, fanouthosts, duthost, sleep_duration, test_run_duration):
global fanout_flap_count
fanout_flap_count = 0

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
while not stop_tasks and time.time() - start_time < test_run_duration:
for port in interface_list:
if stop_threads:
break

fanout, fanout_port = fanout_switch_port_lookup(fanouthosts, duthost.hostname, port)
if fanout and fanout_port:
logger.info("flap interface fanout {} port {}".format(fanout, fanout_port))
fanout.shutdown(fanout_port)
time.sleep(sleep_duration)
await asyncio.sleep(sleep_duration)
fanout.no_shutdown(fanout_port)
time.sleep(sleep_duration)
await asyncio.sleep(sleep_duration)
else:
logger.warning("fanout not found for {} port {}".format(duthost.hostname, port))

if stop_tasks:
break

fanout_flap_count += 1
if stop_threads:
logger.info("Stop flap thread, breaking flap fanout {} dut {} interface {} flap count {}".format(
if stop_tasks:
logger.info("Stop flap task, breaking flap fanout {} dut {} interface {} flap count {}".format(
fanouthosts, duthost, port, fanout_flap_count))
break


def flap_neighbor_interface(neighbor, neighbor_port, stop_event, sleep_duration, test_run_duration):
async def flap_neighbor_interface(neighbor, neighbor_port, sleep_duration, test_run_duration):
logger.info("flap neighbor {} interface {}".format(neighbor, neighbor_port))
global neighbor_flap_count
neighbor_flap_count = 0

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
while not stop_tasks and time.time() - start_time < test_run_duration:
neighbor.shutdown(neighbor_port)
time.sleep(sleep_duration)
await asyncio.sleep(sleep_duration)
neighbor.no_shutdown(neighbor_port)
time.sleep(sleep_duration)
if stop_threads:
logger.info("Stop flap thread, breaking flap neighbor {} interface {} flap count {}".format(
await asyncio.sleep(sleep_duration)
neighbor_flap_count += 1
if stop_tasks:
logger.info("Stop flap task, breaking flap neighbor {} interface {} flap count {}".format(
neighbor, neighbor_port, neighbor_flap_count))
break
neighbor_flap_count += 1


@pytest.mark.parametrize("test_type", ["dut", "fanout", "neighbor", "all"])
def test_bgp_stress_link_flap(duthosts, rand_one_dut_hostname, setup, nbrhosts, fanouthosts, test_type,
get_function_completeness_level):
global stop_threads
global stop_tasks
global dut_flap_count
global fanout_flap_count
global neighbor_flap_count
global TEST_RUN_DURATION

duthost = duthosts[rand_one_dut_hostname]

Expand All @@ -243,152 +235,60 @@ def test_bgp_stress_link_flap(duthosts, rand_one_dut_hostname, setup, nbrhosts,
interface_list = eth_nbrs.keys()
logger.debug('interface_list: {}'.format(interface_list))

stop_threads = False
stop_tasks = False
dut_flap_count = 0
fanout_flap_count = 0
neighbor_flap_count = 0
# Create a stop event
stop_event = threading.Event()

flap_threads = []

if test_type == "dut":
for interface in interface_list:
thread = InterruptableThread(
target=flap_dut_interface,
args=(duthost, interface, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread.daemon = True
thread.start()
logger.info("Start flap thread {} dut {} interface {}".format(thread, duthost, interface))
flap_threads.append(thread)
# create only one thread for vs due to memory resource limitation
if asic_type == "vs":
break
elif test_type == "fanout":
thread = InterruptableThread(
target=flap_fanout_interface,
args=(interface_list, fanouthosts, duthost, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread.daemon = True
thread.start()
logger.info("Start flap thread {} fanout {} dut {}".format(thread, fanouthosts, duthost))
flap_threads.append(thread)
elif test_type == "neighbor":
for interface in interface_list:
neighbor_name = eth_nbrs[interface]["name"]
neighbor_port = eth_nbrs[interface]["port"]
neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None)
if neighbor_host:
thread = InterruptableThread(
target=flap_neighbor_interface,
args=(neighbor_host, neighbor_port, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread.daemon = True
thread.start()
logger.info("Start flap thread {} neighbor {} port {}".format(thread, neighbor_host, neighbor_port))
flap_threads.append(thread)
else:
logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port))
# create only one thread for vs due to memory resource limitation
if asic_type == "vs":
break
elif test_type == "all":
for interface in interface_list:
logger.info("shutdown all interface {} ".format(interface))
thread_dut = InterruptableThread(
target=flap_dut_interface,
args=(duthost, interface, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread_dut.daemon = True
thread_dut.start()
logger.info("Start flap thread {} dut {} interface {}".format(thread_dut, duthost, interface))
flap_threads.append(thread_dut)

neighbor_name = eth_nbrs[interface]["name"]
neighbor_port = eth_nbrs[interface]["port"]
neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None)
if neighbor_host:
thread_neighbor = InterruptableThread(
target=flap_neighbor_interface,
args=(neighbor_host, neighbor_port, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread_neighbor.daemon = True
thread_neighbor.start()
logger.info("Start flap thread {} neighbor {} port {}".format(
thread_neighbor, neighbor_host, neighbor_port))
flap_threads.append(thread_neighbor)
else:
logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port))

thread_fanout = InterruptableThread(
target=flap_fanout_interface,
args=(interface_list, fanouthosts, duthost, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread_fanout.daemon = True
thread_fanout.start()
logger.info("Start flap thread {} fanout {} dut {} ".format(
thread_fanout, fanouthosts, duthost))
flap_threads.append(thread_fanout)

logger.info("flap_threads {} ".format(flap_threads))

avail_mem_list = []
end_time = time.time() + TEST_RUN_DURATION
while time.time() < end_time:
time.sleep(30)

cmd = "free -m"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))
lines = cmd_out.split('\n')
for line in lines:
if line.startswith("Mem:"):
fields = line.split()
total_mem, avail_mem = int(fields[1]), int(fields[-1])
logger.info("Total memory {} Available memory: {}".format(total_mem, avail_mem))
avail_mem_list.append(avail_mem)
break

if avail_mem < MEMORY_EXHAUST_THRESHOLD:
logger.error("Available memory {} is less than {}, stopping the test".format(
avail_mem, MEMORY_EXHAUST_THRESHOLD))

cmd = "top -b -n 1"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))

cmd = "sudo monit status"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))

cmd = "docker stats --no-stream"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))

break

logger.info("Test running for {} seconds".format(time.time() + TEST_RUN_DURATION - end_time))
logger.info("Test run duration dut_flap_count {} fanout_flap_count {} neighbor_flap_count {}".format(
dut_flap_count, fanout_flap_count, neighbor_flap_count))
stop_event.set()
stop_threads = True
logger.info("stop_threads {} ".format(flap_threads))
time.sleep(30)

for thread in flap_threads:
logger.info("waiting thread {} done".format(thread))
try:
if thread.is_alive():
thread.join(timeout=30)
logger.info("thread {} joined".format(thread))
except Exception as e:
logger.debug("Exception occurred in thread %r:", thread)
logger.debug("".join(traceback.format_exception(None, e, e.__traceback__)))

# Clean up the thread list after joining all threads
logger.info("clear threads {} ".format(flap_threads))
flap_threads.clear()
logger.debug("avail_mem history {} ".format(avail_mem_list))

def check_test_type(match_type):
return test_type in [match_type, "all"]

async def flap_interfaces():
flap_tasks = []
if check_test_type("dut"):
for interface in interface_list:
task = asyncio.create_task(
flap_dut_interface(duthost, interface, delay_time, TEST_RUN_DURATION))
logger.info("Start flap dut {} interface {}".format(duthost, interface))
flap_tasks.append(task)

if check_test_type("neighbor"):
for interface in interface_list:
neighbor_name = eth_nbrs[interface]["name"]
neighbor_port = eth_nbrs[interface]["port"]
neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None)
if neighbor_host:
task = asyncio.create_task(
flap_neighbor_interface(neighbor_host, neighbor_port, delay_time, TEST_RUN_DURATION))
logger.info("Start flap neighbor {} port {}".format(neighbor_host, neighbor_port))
flap_tasks.append(task)
else:
logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port))

if check_test_type("fanout"):
task = asyncio.create_task(
flap_fanout_interface(interface_list, fanouthosts, duthost, delay_time, TEST_RUN_DURATION))
logger.info("Start flap fanout {} dut {} ".format(fanouthosts, duthost))
flap_tasks.append(task)

logger.info("flap_tasks {} ".format(flap_tasks))
start_time = time.time()

await asyncio.sleep(TEST_RUN_DURATION)

global stop_tasks
stop_tasks = True
logger.info("stop_tasks {} ".format(flap_tasks))

await asyncio.gather(*flap_tasks)

logger.info("Test running for {} seconds".format(time.time() - start_time))
logger.info("Test run duration dut_flap_count {} fanout_flap_count {} neighbor_flap_count {}".format(
dut_flap_count, fanout_flap_count, neighbor_flap_count))

# Clean up the task list after joining all tasks
logger.info("clear tasks {} ".format(flap_tasks))
flap_tasks.clear()

asyncio.run(flap_interfaces())
return

0 comments on commit e2528cb

Please sign in to comment.