Skip to content

Commit

Permalink
Added max-concurrency CLI option
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon committed May 16, 2024
1 parent a6d50c0 commit 8bd60c4
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
10 changes: 10 additions & 0 deletions anta/cli/nrfu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,21 @@ def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]:
is_flag=True,
default=False,
)
@click.option(
"--max-concurrency",
help="Maximum number of tests to run concurrently.",
type=int,
show_envvar=True,
default=50000,
show_default=True,
)
# pylint: disable=too-many-arguments
def nrfu(
ctx: click.Context,
inventory: AntaInventory,
tags: set[str] | None,
catalog: AntaCatalog,
max_concurrency: int,
device: tuple[str],
test: tuple[str],
hide: tuple[str],
Expand All @@ -138,6 +147,7 @@ def nrfu(
ctx.obj["result_manager"],
inventory,
catalog,
max_concurrency=max_concurrency,
tags=tags,
devices=set(device) if device else None,
tests=set(test) if test else None,
Expand Down
4 changes: 4 additions & 0 deletions anta/result_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from __future__ import annotations

from typing import Any

from pydantic import BaseModel

from anta.custom_types import TestStatus
Expand Down Expand Up @@ -32,6 +34,8 @@ class TestResult(BaseModel):
result: TestStatus = "unset"
messages: list[str] = []
custom_field: str | None = None
json_output: dict[str, Any] | None = None
text_output: str | None = None

def is_success(self, message: str | None = None) -> None:
"""Set status to success.
Expand Down
27 changes: 18 additions & 9 deletions anta/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
logger = logging.getLogger(__name__)

DEFAULT_NOFILE = 16384
MAXIMUM_TEST_CONCURRENCY = 20000


def adjust_rlimit_nofile() -> tuple[int, int]:
Expand Down Expand Up @@ -93,16 +92,17 @@ async def run_tests(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResu
------
TestResult: The result of each completed test.
"""
aws = aiter(tests_generator)
# NOTE: The `aiter` built-in function is not available in Python 3.9
aws = tests_generator.__aiter__() # pylint: disable=unnecessary-dunder-call
aws_ended = False
pending: set[Future[TestResult]] = set()

while pending or not aws_ended:
# Add tests to the pending set until the limit is reached or no more tests are available
while len(pending) < limit and not aws_ended:
try:
# Get the next test coroutine from the generator
aw = await anext(aws)
# NOTE: The `anext` built-in function is not available in Python 3.9
aw = await aws.__anext__() # pylint: disable=unnecessary-dunder-call
except StopAsyncIteration: # noqa: PERF203
aws_ended = True
logger.debug("All tests have been added to the pending set.")
Expand Down Expand Up @@ -246,6 +246,7 @@ async def main( # noqa: PLR0913
manager: ResultManager,
inventory: AntaInventory,
catalog: AntaCatalog,
max_concurrency: int = 50000,
devices: set[str] | None = None,
tests: set[str] | None = None,
tags: set[str] | None = None,
Expand All @@ -264,6 +265,7 @@ async def main( # noqa: PLR0913
manager: ResultManager object to populate with the test results.
inventory: AntaInventory object that includes the device(s).
catalog: AntaCatalog object that includes the list of tests.
max_concurrency: Maximum number of tests to run concurrently. Default is 50000.
devices: Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU.
tests: Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU.
tags: Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU.
Expand All @@ -289,18 +291,25 @@ async def main( # noqa: PLR0913
return

run_info = (
"--- ANTA NRFU Run Information ---\n"
"------------------------------------ ANTA NRFU Run Information -------------------------------------\n"
f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n"
f"Total number of selected tests: {catalog.final_tests_count}\n"
f"Maximum number of tests to run concurrently: {max_concurrency}\n"
f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n"
"---------------------------------"
"----------------------------------------------------------------------------------------------------"
)

logger.info(run_info)

if catalog.final_tests_count > limits[0]:
if catalog.final_tests_count > max_concurrency:
logger.warning(
"The number of concurrent tests is higher than the open file descriptors limit for this ANTA process.\n"
"The total number of tests is higher than the maximum number of tests to run concurrently.\n"
"ANTA will be throttled to run at the maximum number of tests to run concurrently to ensure system stability.\n"
"Please consult the ANTA FAQ."
)
if max_concurrency > limits[0]:
logger.warning(
"The maximum number of tests to run concurrently is higher than the open file descriptors limit for this ANTA process.\n"
"Errors may occur while running the tests.\n"
"Please consult the ANTA FAQ."
)
Expand All @@ -317,7 +326,7 @@ async def main( # noqa: PLR0913
AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=catalog.final_tests_count)

with Catchtime(logger=logger, message="Running ANTA tests"):
async for result in run_tests(tests_generator, limit=MAXIMUM_TEST_CONCURRENCY):
async for result in run_tests(tests_generator, limit=max_concurrency):
manager.add(result)

log_cache_statistics(selected_inventory.devices)

0 comments on commit 8bd60c4

Please sign in to comment.