From 83206e884b0e007dfe7febc6b8373b4775c0639a Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 16 May 2024 00:41:23 -0400 Subject: [PATCH 01/13] feat(anta): limit the number of tests run concurrently --- anta/runner.py | 170 +++++++++++++++++++++++++++++-------- tests/units/test_runner.py | 9 +- 2 files changed, 139 insertions(+), 40 deletions(-) diff --git a/anta/runner.py b/anta/runner.py index dcb2d962e..e011dd577 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -18,7 +18,8 @@ from anta.tools import Catchtime, cprofile if TYPE_CHECKING: - from collections.abc import Coroutine + from asyncio import Task + from collections.abc import AsyncGenerator, Coroutine from anta.catalog import AntaCatalog, AntaTestDefinition from anta.device import AntaDevice @@ -29,6 +30,30 @@ logger = logging.getLogger(__name__) DEFAULT_NOFILE = 16384 +"""Default number of open file descriptors for the ANTA process.""" +DEFAULT_MAX_CONCURRENCY = 10000 +"""Default maximum number of tests to run concurrently.""" +DEFAULT_MAX_CONNECTIONS = 100 +"""Default underlying HTTPX client maximum number of connections per device.""" + + +def adjust_max_concurrency() -> int: + """Adjust the maximum number of tests (coroutines) to run concurrently. + + The limit is set to the value of the ANTA_MAX_CONCURRENCY environment variable. + + If the `ANTA_MAX_CONCURRENCY` environment variable is not set or is invalid, `DEFAULT_MAX_CONCURRENCY` is used. + + Returns + ------- + The maximum number of tests to run concurrently. + """ + try: + max_concurrency = int(os.environ.get("ANTA_MAX_CONCURRENCY", DEFAULT_MAX_CONCURRENCY)) + except ValueError as exception: + logger.warning("The ANTA_MAX_CONCURRENCY environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_MAX_CONCURRENCY) + max_concurrency = DEFAULT_MAX_CONCURRENCY + return max_concurrency def adjust_rlimit_nofile() -> tuple[int, int]: @@ -40,7 +65,6 @@ def adjust_rlimit_nofile() -> tuple[int, int]: Returns ------- - tuple[int, int] The new soft and hard limits for open file descriptors. """ try: @@ -77,6 +101,61 @@ def log_cache_statistics(devices: list[AntaDevice]) -> None: logger.info("Caching is not enabled on %s", device.name) +async def run(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResult], None], limit: int) -> AsyncGenerator[TestResult, None]: + """Run tests with a concurrency limit. + + This function takes an asynchronous generator of test coroutines and runs them + with a limit on the number of concurrent tests. It yields test results as each + test completes. + + Inspired by: https://death.andgravity.com/limit-concurrency + + Parameters + ---------- + tests_generator + An asynchronous generator that yields test coroutines. + limit + The maximum number of concurrent tests to run. + + Yields + ------ + The result of each completed test. + """ + # NOTE: The `aiter` built-in function is not available in Python 3.9 + aws = tests_generator.__aiter__() # pylint: disable=unnecessary-dunder-call + aws_ended = False + pending: set[Task[TestResult]] = set() + + while pending or not aws_ended: + # Add tests to the pending set until the limit is reached or no more tests are available + while len(pending) < limit and not aws_ended: + try: + # NOTE: The `anext` built-in function is not available in Python 3.9 + aw = await aws.__anext__() # pylint: disable=unnecessary-dunder-call + except StopAsyncIteration: # noqa: PERF203 + aws_ended = True + logger.debug("All tests have been added to the pending set.") + else: + # Ensure the coroutine is scheduled to run and add it to the pending set + pending.add(asyncio.create_task(aw)) + logger.debug("Added a test to the pending set: %s", aw) + + if len(pending) >= limit: + logger.debug("Concurrency limit reached: %s tests running. Waiting for tests to complete.", limit) + + if not pending: + logger.debug("No pending tests and all tests have been processed. Exiting.") + return + + # Wait for at least one of the pending tests to complete + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + logger.debug("Completed %s test(s). Pending count: %s", len(done), len(pending)) + + # Yield results of completed tests + while done: + yield await done.pop() + + async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devices: set[str] | None, *, established_only: bool) -> AntaInventory | None: """Set up the inventory for the ANTA run. @@ -93,8 +172,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic Returns ------- - AntaInventory | None - The filtered inventory or None if there are no devices to run tests on. + The filtered AntaInventory or None if there are no devices to run tests on. """ if len(inventory) == 0: logger.info("The inventory is empty, exiting") @@ -119,10 +197,10 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic return selected_inventory -def prepare_tests( +def setup_tests( inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None -) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None: - """Prepare the tests to run. +) -> tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None]: + """Set up the tests for the ANTA run. Parameters ---------- @@ -137,17 +215,17 @@ def prepare_tests( Returns ------- - defaultdict[AntaDevice, set[AntaTestDefinition]] | None - A mapping of devices to the tests to run or None if there are no tests to run. + The total number of tests and a mapping of devices to the tests to run or None if there are no tests to run. """ # Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests catalog.build_indexes(filtered_tests=tests) + total_tests = 0 + # Using a set to avoid inserting duplicate tests device_to_tests: defaultdict[AntaDevice, set[AntaTestDefinition]] = defaultdict(set) - # Create AntaTestRunner tuples from the tags - final_tests_count = 0 + # Create the mapping of devices to the tests to run for device in inventory.devices: if tags: if not any(tag in device.tags for tag in tags): @@ -160,40 +238,42 @@ def prepare_tests( # Add the tests with matching tags from device tags device_to_tests[device].update(catalog.get_tests_by_tags(device.tags)) - final_tests_count += len(device_to_tests[device]) + total_tests += len(device_to_tests[device]) - if len(device_to_tests.values()) == 0: + if total_tests == 0: msg = ( f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." ) logger.warning(msg) - return None + return total_tests, None - return device_to_tests + return total_tests, device_to_tests -def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager) -> list[Coroutine[Any, Any, TestResult]]: +async def test_generator( + selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager +) -> AsyncGenerator[Coroutine[Any, Any, TestResult], None]: """Get the coroutines for the ANTA run. + It creates an async generator of coroutines which are created by the `test` method of the AntaTest instances. Each coroutine is a test to run. + Parameters ---------- selected_tests - A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function. + A mapping of devices to the tests to run. The selected tests are created by the `setup_tests` function. manager A ResultManager - Returns - ------- - list[Coroutine[Any, Any, TestResult]] - The list of coroutines to run. + Yields + ------ + The coroutine (test) to run. """ - coros = [] for device, test_definitions in selected_tests.items(): for test in test_definitions: try: test_instance = test.test(device=device, inputs=test.inputs) manager.add(test_instance.result) - coros.append(test_instance.test()) + coroutine = test_instance.test() except Exception as e: # noqa: PERF203, BLE001 # An AntaTest instance is potentially user-defined code. # We need to catch everything and exit gracefully with an error message. @@ -204,7 +284,8 @@ def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinitio ], ) anta_log_exception(e, message, logger) - return coros + else: + yield coroutine @cprofile() @@ -246,6 +327,9 @@ async def main( # noqa: PLR0913 # Adjust the maximum number of open file descriptors for the ANTA process limits = adjust_rlimit_nofile() + # Adjust the maximum number of tests to run concurrently + max_concurrency = adjust_max_concurrency() + if not catalog.tests: logger.info("The list of tests is empty, exiting") return @@ -257,40 +341,54 @@ async def main( # noqa: PLR0913 return with Catchtime(logger=logger, message="Preparing the tests"): - selected_tests = prepare_tests(selected_inventory, catalog, tests, tags) - if selected_tests is None: + total_tests, selected_tests = setup_tests(selected_inventory, catalog, tests, tags) + if total_tests == 0 or selected_tests is None: return final_tests_count = sum(len(tests) for tests in selected_tests.values()) + generator = test_generator(selected_tests, manager) + run_info = ( - "--- ANTA NRFU Run Information ---\n" + "------------------------------------ ANTA NRFU Run Information -------------------------------------\n" f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n" - f"Total number of selected tests: {final_tests_count}\n" + f"Total number of selected tests: {total_tests}\n" + f"Maximum number of tests to run concurrently: {max_concurrency}\n" + f"Maximum number of connections per device: {DEFAULT_MAX_CONNECTIONS}\n" f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n" - "---------------------------------" + "----------------------------------------------------------------------------------------------------" ) logger.info(run_info) - if final_tests_count > limits[0]: + total_potential_connections = len(selected_inventory) * DEFAULT_MAX_CONNECTIONS + + if total_tests > max_concurrency: + logger.warning( + "The total number of tests is higher than the maximum number of tests to run concurrently.\n" + "ANTA will be throttled to run at the maximum number of tests to run concurrently to ensure system stability.\n" + "Please consult the ANTA FAQ." + ) + if total_potential_connections > limits[0]: logger.warning( - "The number of concurrent tests is higher than the open file descriptors limit for this ANTA process.\n" + "The total potential connections to devices is higher than the open file descriptors limit for this ANTA process.\n" "Errors may occur while running the tests.\n" "Please consult the ANTA FAQ." ) - coroutines = get_coroutines(selected_tests, manager) + # Cleanup no longer needed objects before running the tests + del selected_tests if dry_run: logger.info("Dry-run mode, exiting before running the tests.") - for coro in coroutines: - coro.close() + async for test in generator: + test.close() return if AntaTest.progress is not None: - AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coroutines)) + AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=final_tests_count) with Catchtime(logger=logger, message="Running ANTA tests"): - await asyncio.gather(*coroutines) + async for result in run(generator, limit=max_concurrency): + logger.debug(result) log_cache_statistics(selected_inventory.devices) diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py index b80259cc3..ebf46c01f 100644 --- a/tests/units/test_runner.py +++ b/tests/units/test_runner.py @@ -16,7 +16,7 @@ from anta.catalog import AntaCatalog from anta.inventory import AntaInventory from anta.result_manager import ResultManager -from anta.runner import adjust_rlimit_nofile, main, prepare_tests +from anta.runner import adjust_rlimit_nofile, main, setup_tests from .test_models import FakeTest, FakeTestWithMissingTest @@ -141,13 +141,13 @@ def side_effect_setrlimit(resource_id: int, limits: tuple[int, int]) -> None: ], indirect=["inventory"], ) -async def test_prepare_tests( +async def test_setup_tests( caplog: pytest.LogCaptureFixture, inventory: AntaInventory, tags: set[str], tests: set[str], devices_count: int, tests_count: int ) -> None: - """Test the runner prepare_tests function with specific tests.""" + """Test the runner setup_tests function with specific tests.""" caplog.set_level(logging.WARNING) catalog: AntaCatalog = AntaCatalog.parse(str(DATA_DIR / "test_catalog_with_tags.yml")) - selected_tests = prepare_tests(inventory=inventory, catalog=catalog, tags=tags, tests=tests) + total_tests, selected_tests = setup_tests(inventory=inventory, catalog=catalog, tags=tags, tests=tests) if selected_tests is None: msg = f"There are no tests matching the tags {tags} to run in the current test catalog and device inventory, please verify your inputs." assert msg in caplog.messages @@ -155,6 +155,7 @@ async def test_prepare_tests( assert selected_tests is not None assert len(selected_tests) == devices_count assert sum(len(tests) for tests in selected_tests.values()) == tests_count + assert total_tests == tests_count async def test_dry_run(caplog: pytest.LogCaptureFixture, inventory: AntaInventory) -> None: From 7ac03587c3586a43aa48c9861e595ec6a431176e Mon Sep 17 00:00:00 2001 From: gmuloc Date: Mon, 4 Nov 2024 23:30:37 +0100 Subject: [PATCH 02/13] Refactor: Fix github merge conflict resolution --- anta/runner.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/anta/runner.py b/anta/runner.py index ebf25914e..78a65ae1e 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any from anta import GITHUB_SUGGESTION +from anta.cli.console import console from anta.logger import anta_log_exception, exc_to_str from anta.models import AntaTest from anta.tools import Catchtime, cprofile @@ -46,6 +47,7 @@ def adjust_max_concurrency() -> int: Returns ------- + int The maximum number of tests to run concurrently. """ try: @@ -65,6 +67,7 @@ def adjust_rlimit_nofile() -> tuple[int, int]: Returns ------- + tuple[int, int] The new soft and hard limits for open file descriptors. """ try: @@ -172,6 +175,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic Returns ------- + AntaInventory | None The filtered AntaInventory or None if there are no devices to run tests on. """ if len(inventory) == 0: @@ -215,6 +219,7 @@ def setup_tests( Returns ------- + tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None] The total number of tests and a mapping of devices to the tests to run or None if there are no tests to run. """ # Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests @@ -243,13 +248,11 @@ def setup_tests( total_test_count += len(device_to_tests[device]) if total_test_count == 0: - msg = ( - f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." - ) + msg = f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." logger.warning(msg) - return total_tests, None + return total_test_count, None - return total_tests, device_to_tests + return total_test_count, device_to_tests async def test_generator( @@ -350,14 +353,17 @@ async def main( # noqa: PLR0913 generator = test_generator(selected_tests, manager) + # TODO: 34 is a magic numbers from RichHandler formatting catering for date, level and path + width = min(int(console.width) - 34, len("Maximum number of open file descriptors for the current ANTA process: 0000000000\n")) + run_info = ( - "------------------------------------ ANTA NRFU Run Information -------------------------------------\n" + f"{' ANTA NRFU Run Information ':-^{width}}\n" f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n" f"Total number of selected tests: {total_tests}\n" f"Maximum number of tests to run concurrently: {max_concurrency}\n" f"Maximum number of connections per device: {DEFAULT_MAX_CONNECTIONS}\n" f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n" - "----------------------------------------------------------------------------------------------------" + f"{'':-^{width}}" ) logger.info(run_info) From 96da6efa545b7dd612ed7e7b81e0ac68097bc496 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 4 Nov 2024 22:32:45 +0000 Subject: [PATCH 03/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- anta/runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/anta/runner.py b/anta/runner.py index 78a65ae1e..d0680cb16 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -248,7 +248,9 @@ def setup_tests( total_test_count += len(device_to_tests[device]) if total_test_count == 0: - msg = f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." + msg = ( + f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." + ) logger.warning(msg) return total_test_count, None From f2f3c6a8af5dafc7e5de73bb4cca4c7966d32bfb Mon Sep 17 00:00:00 2001 From: gmuloc Date: Tue, 5 Nov 2024 00:15:41 +0100 Subject: [PATCH 04/13] test: Temporarily disable one benchmark --- tests/benchmark/test_runner.py | 43 +++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/tests/benchmark/test_runner.py b/tests/benchmark/test_runner.py index b020a85d0..0ae848527 100644 --- a/tests/benchmark/test_runner.py +++ b/tests/benchmark/test_runner.py @@ -7,8 +7,7 @@ from typing import TYPE_CHECKING -from anta.result_manager import ResultManager -from anta.runner import get_coroutines, prepare_tests +from anta.runner import setup_tests if TYPE_CHECKING: from collections import defaultdict @@ -20,29 +19,35 @@ from anta.inventory import AntaInventory -def test_prepare_tests(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: - """Benchmark `anta.runner.prepare_tests`.""" +def test_setup_tests(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: + """Benchmark `anta.runner.setup_tests`.""" - def _() -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None: + def _() -> tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None]: catalog.clear_indexes() - return prepare_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) + return setup_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) - selected_tests = benchmark(_) + result = benchmark(_) + assert result is not None + count, selected_tests = result assert selected_tests is not None assert len(selected_tests) == len(inventory) assert sum(len(tests) for tests in selected_tests.values()) == len(inventory) * len(catalog.tests) -def test_get_coroutines(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: - """Benchmark `anta.runner.get_coroutines`.""" - selected_tests = prepare_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) - - assert selected_tests is not None - - coroutines = benchmark(lambda: get_coroutines(selected_tests=selected_tests, manager=ResultManager())) - for coros in coroutines: - coros.close() - - count = sum(len(tests) for tests in selected_tests.values()) - assert count == len(coroutines) +# ruff: noqa: ERA001 +# TODO: see what can be done with this benchmark +# def test_get_coroutines(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: +# """Benchmark `anta.runner.get_coroutines`.""" +# result = setup_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) +# assert result is not None +# count, selected_tests = result +# +# assert selected_tests is not None +# +# coroutines = benchmark(lambda: get_coroutines(selected_tests=selected_tests, manager=ResultManager())) +# for coros in coroutines: +# coros.close() +# +# count = sum(len(tests) for tests in selected_tests.values()) +# assert count == len(coroutines) From b91715e69fe5cdd146bf431dd9c09e9b28d561a4 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Wed, 6 Nov 2024 18:34:03 -0500 Subject: [PATCH 05/13] Added HTTPX timeouts and limits --- anta/cli/exec/utils.py | 2 +- anta/device.py | 130 +++++++++++++++++--- anta/runner.py | 204 +++++++++++++++++++++++--------- docs/advanced_usages/scaling.md | 9 ++ mkdocs.yml | 1 + tests/benchmark/test_runner.py | 14 +-- tests/units/test_device.py | 10 +- tests/units/test_runner.py | 9 +- 8 files changed, 282 insertions(+), 97 deletions(-) create mode 100644 docs/advanced_usages/scaling.md diff --git a/anta/cli/exec/utils.py b/anta/cli/exec/utils.py index 33a02220c..272871a32 100644 --- a/anta/cli/exec/utils.py +++ b/anta/cli/exec/utils.py @@ -154,7 +154,7 @@ async def collect(device: AntaDevice) -> None: ) logger.warning("Configuring 'aaa authorization exec default local' on device %s", device.name) command = AntaCommand(command="show running-config | include aaa authorization exec default local", ofmt="text") - await device._session.cli(commands=commands) + await device._client.cli(commands=commands) logger.info("Configured 'aaa authorization exec default local' on device %s", device.name) logger.debug("'aaa authorization exec default local' is already configured on device %s", device.name) diff --git a/anta/device.py b/anta/device.py index d7d2b0de2..8d7b9fa75 100644 --- a/anta/device.py +++ b/anta/device.py @@ -7,6 +7,7 @@ import asyncio import logging +import os from abc import ABC, abstractmethod from collections import defaultdict from typing import TYPE_CHECKING, Any, Literal @@ -16,7 +17,7 @@ from aiocache import Cache from aiocache.plugins import HitMissRatioPlugin from asyncssh import SSHClientConnection, SSHClientConnectionOptions -from httpx import ConnectError, HTTPError, TimeoutException +from httpx import ConnectError, HTTPError, Limits, Timeout, TimeoutException import asynceapi from anta import __DEBUG__ @@ -33,6 +34,78 @@ # https://github.com/pyca/cryptography/issues/7236#issuecomment-1131908472 CLIENT_KEYS = asyncssh.public_key.load_default_keypairs() +ANTA_DEFAULT_TIMEOUT = 30.0 + + +def get_httpx_limits() -> Limits: + """Adjust the underlying HTTPX client resource limits. + + The limits are set using the following environment variables: + - ANTA_MAX_CONNECTIONS: Maximum number of allowable connections. + - ANTA_MAX_KEEPALIVE_CONNECTIONS: Number of allowable keep-alive connections. + - ANTA_KEEPALIVE_EXPIRY: Time limit on idle keep-alive connections in seconds. + + If any environment variable is not set or is invalid, the following HTTPX default limits are used: + - max_connections: 100 + - max_keepalive_connections: 20 + - keepalive_expiry: 5.0 + + These limits are set for all devices. + + Returns + ------- + Limits + HTTPX Limits object with configured connection limits. + + TODO: HTTPX supports None to disable limits. This is not implemented yet. + """ + try: + max_connections = int(os.environ.get("ANTA_MAX_CONNECTIONS", 100)) + max_keepalive_connections = int(os.environ.get("ANTA_MAX_KEEPALIVE_CONNECTIONS", 20)) + keepalive_expiry = float(os.environ.get("ANTA_KEEPALIVE_EXPIRY", 5.0)) + except ValueError as exc: + default_limits = Limits(max_connections=100, max_keepalive_connections=20, keepalive_expiry=5.0) + logger.warning("Error parsing HTTPX resource limits from environment variables: %s\nDefaults to %s", exc, default_limits) + return default_limits + return Limits(max_connections=max_connections, max_keepalive_connections=max_keepalive_connections, keepalive_expiry=keepalive_expiry) + + +def get_httpx_timeout(timeout: float | None) -> Timeout: + """Adjust the underlying HTTPX client timeout. + + The timeouts are set using the following environment variables: + - ANTA_CONNECT_TIMEOUT: Maximum amount of time to wait until a socket connection to the requested host is established. + - ANTA_READ_TIMEOUT: Maximum duration to wait for a chunk of data to be received (for example, a chunk of the response body). + - ANTA_WRITE_TIMEOUT: Maximum duration to wait for a chunk of data to be sent (for example, a chunk of the request body). + - ANTA_POOL_TIMEOUT: Maximum duration to wait for acquiring a connection from the connection pool. + + If any environment variable is not set or is invalid, the provided timeout value is used. + If no timeout is provided, 30 seconds is used which is the default when running ANTA. + + Parameters + ---------- + timeout : float | None + Global timeout value in seconds. Used if specific timeouts are not set. + + Returns + ------- + Timeout + HTTPX Timeout object with configured timeout values. + + TODO: HTTPX supports None to disable timeouts. This is not implemented yet. + """ + timeout = timeout if timeout is not None else ANTA_DEFAULT_TIMEOUT + try: + connect = float(os.environ.get("ANTA_CONNECT_TIMEOUT", timeout)) + read = float(os.environ.get("ANTA_READ_TIMEOUT", timeout)) + write = float(os.environ.get("ANTA_WRITE_TIMEOUT", timeout)) + pool = float(os.environ.get("ANTA_POOL_TIMEOUT", timeout)) + except ValueError as exc: + default_timeout = Timeout(timeout=timeout) + logger.warning("Error parsing HTTPX timeouts from environment variables: %s\nDefaults to %s", exc, default_timeout) + return default_timeout + return Timeout(connect=connect, read=read, write=write, pool=pool) + class AntaDevice(ABC): """Abstract class representing a device in ANTA. @@ -263,7 +336,7 @@ def __init__( name: str | None = None, enable_password: str | None = None, port: int | None = None, - ssh_port: int | None = 22, + ssh_port: int = 22, tags: set[str] | None = None, timeout: float | None = None, proto: Literal["http", "https"] = "https", @@ -321,13 +394,28 @@ def __init__( raise ValueError(message) self.enable = enable self._enable_password = enable_password - self._session: asynceapi.Device = asynceapi.Device(host=host, port=port, username=username, password=password, proto=proto, timeout=timeout) - ssh_params: dict[str, Any] = {} - if insecure: - ssh_params["known_hosts"] = None - self._ssh_opts: SSHClientConnectionOptions = SSHClientConnectionOptions( - host=host, port=ssh_port, username=username, password=password, client_keys=CLIENT_KEYS, **ssh_params - ) + + # Create the async eAPI client + self._client = self._create_asynceapi_client(host, port, username, password, proto, timeout) + + # Create the SSH connection options + self._ssh_opts = self._create_ssh_options(host, ssh_port, username, password, insecure=insecure) + + def _create_asynceapi_client( + self, host: str, port: int | None, username: str, password: str, proto: Literal["http", "https"], timeout: float | None + ) -> asynceapi.Device: + """Create the asynceapi client with the provided parameters.""" + # Get resource limits and timeout values from environment variables or use default values + client_limits = get_httpx_limits() + client_timeout = get_httpx_timeout(timeout) + + return asynceapi.Device(host=host, port=port, username=username, password=password, proto=proto, timeout=client_timeout, limits=client_limits) + + def _create_ssh_options(self, host: str, port: int, username: str, password: str, *, insecure: bool) -> SSHClientConnectionOptions: + """Create the SSH connection options with the provided parameters.""" + ssh_params = {"known_hosts": None} if insecure else {} + + return SSHClientConnectionOptions(host=host, port=port, username=username, password=password, client_keys=CLIENT_KEYS, **ssh_params) def __rich_repr__(self) -> Iterator[tuple[str, Any]]: """Implement Rich Repr Protocol. @@ -335,8 +423,8 @@ def __rich_repr__(self) -> Iterator[tuple[str, Any]]: https://rich.readthedocs.io/en/stable/pretty.html#rich-repr-protocol. """ yield from super().__rich_repr__() - yield ("host", self._session.host) - yield ("eapi_port", self._session.port) + yield ("host", self._client.host) + yield ("eapi_port", self._client.port) yield ("username", self._ssh_opts.username) yield ("enable", self.enable) yield ("insecure", self._ssh_opts.known_hosts is None) @@ -345,7 +433,7 @@ def __rich_repr__(self) -> Iterator[tuple[str, Any]]: removed_pw = "" _ssh_opts["password"] = removed_pw _ssh_opts["kwargs"]["password"] = removed_pw - yield ("_session", vars(self._session)) + yield ("_client", vars(self._client)) yield ("_ssh_opts", _ssh_opts) def __repr__(self) -> str: @@ -357,8 +445,8 @@ def __repr__(self) -> str: f"is_online={self.is_online!r}, " f"established={self.established!r}, " f"disable_cache={self.cache is None!r}, " - f"host={self._session.host!r}, " - f"eapi_port={self._session.port!r}, " + f"host={self._client.host!r}, " + f"eapi_port={self._client.port!r}, " f"username={self._ssh_opts.username!r}, " f"enable={self.enable!r}, " f"insecure={self._ssh_opts.known_hosts is None!r})" @@ -370,7 +458,7 @@ def _keys(self) -> tuple[Any, ...]: This covers the use case of port forwarding when the host is localhost and the devices have different ports. """ - return (self._session.host, self._session.port) + return (self._client.host, self._client.port) async def _collect(self, command: AntaCommand, *, collection_id: str | None = None) -> None: # noqa: C901 function is too complex - because of many required except blocks """Collect device command output from EOS using aio-eapi. @@ -399,7 +487,7 @@ async def _collect(self, command: AntaCommand, *, collection_id: str | None = No commands.append({"cmd": "enable"}) commands += [{"cmd": command.command, "revision": command.revision}] if command.revision else [{"cmd": command.command}] try: - response: list[dict[str, Any] | str] = await self._session.cli( + response: list[dict[str, Any] | str] = await self._client.cli( commands=commands, ofmt=command.ofmt, version=command.version, @@ -421,9 +509,13 @@ async def _collect(self, command: AntaCommand, *, collection_id: str | None = No except TimeoutException as e: # This block catches Timeout exceptions. command.errors = [exc_to_str(e)] - timeouts = self._session.timeout.as_dict() + timeouts = self._client.timeout.as_dict() logger.error( - "%s occurred while sending a command to %s. Consider increasing the timeout.\nCurrent timeouts: Connect: %s | Read: %s | Write: %s | Pool: %s", + "%s occurred while sending a command to %s.\n" + "Current timeouts: Connect: %s | Read: %s | Write: %s | Pool: %s\n" + "You can either increase the global timeout or configure specific timeout behaviors. " + "See Scaling ANTA documentation for details: " + "https://anta.arista.com/stable/advanced_usages/scaling/", exc_to_str(e), self.name, timeouts["connect"], @@ -455,7 +547,7 @@ async def refresh(self) -> None: - hw_model: The hardware model of the device """ logger.debug("Refreshing device %s", self.name) - self.is_online = await self._session.check_connection() + self.is_online = await self._client.check_connection() if self.is_online: show_version = AntaCommand(command="show version") await self._collect(show_version) diff --git a/anta/runner.py b/anta/runner.py index d0680cb16..e3d2a6483 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -11,16 +11,18 @@ import resource from collections import defaultdict from typing import TYPE_CHECKING, Any +from warnings import warn from anta import GITHUB_SUGGESTION from anta.cli.console import console +from anta.device import get_httpx_limits from anta.logger import anta_log_exception, exc_to_str from anta.models import AntaTest from anta.tools import Catchtime, cprofile if TYPE_CHECKING: from asyncio import Task - from collections.abc import AsyncGenerator, Coroutine + from collections.abc import AsyncGenerator, Coroutine, Iterator from anta.catalog import AntaCatalog, AntaTestDefinition from anta.device import AntaDevice @@ -34,11 +36,66 @@ """Default number of open file descriptors for the ANTA process.""" DEFAULT_MAX_CONCURRENCY = 10000 """Default maximum number of tests to run concurrently.""" -DEFAULT_MAX_CONNECTIONS = 100 -"""Default underlying HTTPX client maximum number of connections per device.""" -def adjust_max_concurrency() -> int: +def log_run_information( + device_count: tuple[int, int], + test_count: int, + max_concurrency: int, + max_connections: int | None, + file_descriptor_limit: int, +) -> None: + """Log ANTA run information and potential resource limit warnings. + + Parameters + ---------- + device_count : tuple[int, int] + Total number of devices in inventory and number of established devices. + test_count : int + Total number of tests to run. + max_concurrency : int + Maximum number of concurrent tests. + max_connections : int | None + Maximum connections per device. None means unlimited. + file_descriptor_limit : int + System file descriptor limit. + """ + # TODO: 34 is a magic numbers from RichHandler formatting catering for date, level and path + width = min(int(console.width) - 34, len("Maximum number of open file descriptors for the current ANTA process: 0000000000\n")) + + devices_total, devices_established = device_count + + run_info = ( + f"{' ANTA NRFU Run Information ':-^{width}}\n" + f"Devices: {devices_total} total, {devices_established} established\n" + f"Tests: {test_count} total\n" + f"Limits:\n" + f" Max concurrent tests: {max_concurrency}\n" + f" Max connections per device: {"Unlimited" if max_connections is None else max_connections}\n" + f" Max file descriptors: {file_descriptor_limit}\n" + f"{'':-^{width}}" + ) + + logger.info(run_info) + + # Log warnings for potential resource limits + if test_count > max_concurrency: + logger.warning("Tests count (%s) exceeds concurrent limit (%s). Tests will be throttled. See Scaling ANTA documentation.", test_count, max_concurrency) + + if max_connections is None: + logger.warning( + "Running with unlimited HTTP connections. Connection errors may occur due to file descriptor limit (%s). See Scaling ANTA documentation.", + file_descriptor_limit, + ) + elif devices_established * max_connections > file_descriptor_limit: + logger.warning( + "Potential connections (%s) exceeds file descriptor limit (%s). Connection errors may occur. See Scaling ANTA documentation.", + devices_established * max_connections, + file_descriptor_limit, + ) + + +def get_max_concurrency() -> int: """Adjust the maximum number of tests (coroutines) to run concurrently. The limit is set to the value of the ANTA_MAX_CONCURRENCY environment variable. @@ -54,12 +111,12 @@ def adjust_max_concurrency() -> int: max_concurrency = int(os.environ.get("ANTA_MAX_CONCURRENCY", DEFAULT_MAX_CONCURRENCY)) except ValueError as exception: logger.warning("The ANTA_MAX_CONCURRENCY environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_MAX_CONCURRENCY) - max_concurrency = DEFAULT_MAX_CONCURRENCY + return DEFAULT_MAX_CONCURRENCY return max_concurrency def adjust_rlimit_nofile() -> tuple[int, int]: - """Adjust the maximum number of open file descriptors for the ANTA process. + """Get the maximum number of open file descriptors for the ANTA process. The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable. @@ -201,10 +258,10 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic return selected_inventory -def setup_tests( +def prepare_tests( inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None -) -> tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None]: - """Set up the tests for the ANTA run. +) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None: + """Prepare the tests to run. Parameters ---------- @@ -219,8 +276,8 @@ def setup_tests( Returns ------- - tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None] - The total number of tests and a mapping of devices to the tests to run or None if there are no tests to run. + defaultdict[AntaDevice, set[AntaTestDefinition]] | None + A mapping of devices to the tests to run or None if there are no tests to run. """ # Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests catalog.build_indexes(filtered_tests=tests) @@ -252,22 +309,21 @@ def setup_tests( f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." ) logger.warning(msg) - return total_test_count, None + return None - return total_test_count, device_to_tests + return device_to_tests -async def test_generator( - selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager -) -> AsyncGenerator[Coroutine[Any, Any, TestResult], None]: - """Get the coroutines for the ANTA run. +def _generate_test_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager) -> Iterator[Coroutine[Any, Any, TestResult]]: + """Generate test coroutines from selected tests for the ANTA run. - It creates an async generator of coroutines which are created by the `test` method of the AntaTest instances. Each coroutine is a test to run. + Internal function that creates the test coroutines. Used by both + `generate_test_coroutines` and `get_coroutines` functions. Parameters ---------- selected_tests - A mapping of devices to the tests to run. The selected tests are created by the `setup_tests` function. + A mapping of devices to the tests to run. manager A ResultManager @@ -281,7 +337,7 @@ async def test_generator( test_instance = test.test(device=device, inputs=test.inputs) manager.add(test_instance.result) coroutine = test_instance.test() - except Exception as e: # noqa: PERF203, BLE001 + except Exception as e: # noqa: BLE001, PERF203 # An AntaTest instance is potentially user-defined code. # We need to catch everything and exit gracefully with an error message. message = "\n".join( @@ -295,6 +351,57 @@ async def test_generator( yield coroutine +async def generate_test_coroutines( + selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager +) -> AsyncGenerator[Coroutine[Any, Any, TestResult], None]: + """Generate test coroutines from selected tests for the ANTA run. + + It creates an async generator of coroutines which are created by the `test` method of the AntaTest instances. Each coroutine is a test to run. + + Parameters + ---------- + selected_tests + A mapping of devices to the tests to run. The selected tests are created by the `prepare_tests` function. + manager + A ResultManager + + Yields + ------ + The coroutine (test) to run. + """ + for coroutine in _generate_test_coroutines(selected_tests, manager): + yield coroutine + + +def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager) -> list[Coroutine[Any, Any, TestResult]]: + """Get the coroutines for the ANTA run. + + Warning + ------- + This function is deprecated and no longer used by the runner as it now uses a generator created by the `test_generator` function of this module. + Will be removed in ANTA v2.0 + + Parameters + ---------- + selected_tests + A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function. + manager + A ResultManager + + Returns + ------- + list[Coroutine[Any, Any, TestResult]] + The list of coroutines to run. + """ + # TODO: Remove this function in ANTA v2.0 + warn( + message="`get_coroutines` is deprecated and no longer used by the runner. Use `test_generator` instead. Will be removed in ANTA v2.0.", + category=DeprecationWarning, + stacklevel=2, + ) + return list(_generate_test_coroutines(selected_tests, manager)) + + @cprofile() async def main( # noqa: PLR0913 manager: ResultManager, @@ -334,8 +441,11 @@ async def main( # noqa: PLR0913 # Adjust the maximum number of open file descriptors for the ANTA process limits = adjust_rlimit_nofile() - # Adjust the maximum number of tests to run concurrently - max_concurrency = adjust_max_concurrency() + # Get the maximum number of tests to run concurrently + max_concurrency = get_max_concurrency() + + # Get the maximum number of connections per device + max_connections = get_httpx_limits().max_connections if not catalog.tests: logger.info("The list of tests is empty, exiting") @@ -347,49 +457,25 @@ async def main( # noqa: PLR0913 if selected_inventory is None: return - with Catchtime(logger=logger, message="Preparing the tests"): - total_tests, selected_tests = setup_tests(selected_inventory, catalog, tests, tags) - if total_tests == 0 or selected_tests is None: + with Catchtime(logger=logger, message="Preparing Tests"): + selected_tests = prepare_tests(selected_inventory, catalog, tests, tags) + if selected_tests is None: return final_tests_count = sum(len(tests) for tests in selected_tests.values()) + del catalog # No longer needed - generator = test_generator(selected_tests, manager) - - # TODO: 34 is a magic numbers from RichHandler formatting catering for date, level and path - width = min(int(console.width) - 34, len("Maximum number of open file descriptors for the current ANTA process: 0000000000\n")) + generator = generate_test_coroutines(selected_tests, manager) - run_info = ( - f"{' ANTA NRFU Run Information ':-^{width}}\n" - f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n" - f"Total number of selected tests: {total_tests}\n" - f"Maximum number of tests to run concurrently: {max_concurrency}\n" - f"Maximum number of connections per device: {DEFAULT_MAX_CONNECTIONS}\n" - f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n" - f"{'':-^{width}}" + log_run_information( + device_count=(len(inventory), len(selected_inventory)), + test_count=final_tests_count, + max_concurrency=max_concurrency, + max_connections=max_connections, + file_descriptor_limit=limits[0], ) - logger.info(run_info) - - total_potential_connections = len(selected_inventory) * DEFAULT_MAX_CONNECTIONS - - if total_tests > max_concurrency: - logger.warning( - "The total number of tests is higher than the maximum number of tests to run concurrently.\n" - "ANTA will be throttled to run at the maximum number of tests to run concurrently to ensure system stability.\n" - "Please consult the ANTA FAQ." - ) - if total_potential_connections > limits[0]: - logger.warning( - "The total potential connections to devices is higher than the open file descriptors limit for this ANTA process.\n" - "Errors may occur while running the tests.\n" - "Please consult the ANTA FAQ." - ) - - # Cleanup no longer needed objects before running the tests - del selected_tests - if dry_run: - logger.info("Dry-run mode, exiting before running the tests.") + logger.info("Dry-run mode, exiting before running tests.") async for test in generator: test.close() return @@ -397,7 +483,7 @@ async def main( # noqa: PLR0913 if AntaTest.progress is not None: AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=final_tests_count) - with Catchtime(logger=logger, message="Running ANTA tests"): + with Catchtime(logger=logger, message="Running Tests"): async for result in run(generator, limit=max_concurrency): logger.debug(result) diff --git a/docs/advanced_usages/scaling.md b/docs/advanced_usages/scaling.md new file mode 100644 index 000000000..8d4482e32 --- /dev/null +++ b/docs/advanced_usages/scaling.md @@ -0,0 +1,9 @@ + + +## Scaling ANTA + +TODO: Add content diff --git a/mkdocs.yml b/mkdocs.yml index 3a321daaf..be9512816 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -179,6 +179,7 @@ nav: - Debug commands: cli/debug.md - Tag Management: cli/tag-management.md - Advanced Usages: + - Scaling ANTA: advanced_usages/scaling.md - Caching in ANTA: advanced_usages/caching.md - Developing ANTA tests: advanced_usages/custom-tests.md - ANTA as a Python Library: advanced_usages/as-python-lib.md diff --git a/tests/benchmark/test_runner.py b/tests/benchmark/test_runner.py index 0ae848527..8a5b41c1d 100644 --- a/tests/benchmark/test_runner.py +++ b/tests/benchmark/test_runner.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING -from anta.runner import setup_tests +from anta.runner import prepare_tests if TYPE_CHECKING: from collections import defaultdict @@ -19,16 +19,14 @@ from anta.inventory import AntaInventory -def test_setup_tests(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: - """Benchmark `anta.runner.setup_tests`.""" +def test_prepare_tests(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: + """Benchmark `anta.runner.prepare_tests`.""" - def _() -> tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None]: + def _() -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None: catalog.clear_indexes() - return setup_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) + return prepare_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) - result = benchmark(_) - assert result is not None - count, selected_tests = result + selected_tests = benchmark(_) assert selected_tests is not None assert len(selected_tests) == len(inventory) diff --git a/tests/units/test_device.py b/tests/units/test_device.py index faf614481..cdaba34ea 100644 --- a/tests/units/test_device.py +++ b/tests/units/test_device.py @@ -565,11 +565,11 @@ def test__eq(self, device1: dict[str, Any], device2: dict[str, Any], expected: b ) async def test_refresh(self, async_device: AsyncEOSDevice, patch_kwargs: list[dict[str, Any]], expected: dict[str, Any]) -> None: """Test AsyncEOSDevice.refresh().""" - with patch.object(async_device._session, "check_connection", **patch_kwargs[0]), patch.object(async_device._session, "cli", **patch_kwargs[1]): + with patch.object(async_device._client, "check_connection", **patch_kwargs[0]), patch.object(async_device._client, "cli", **patch_kwargs[1]): await async_device.refresh() - async_device._session.check_connection.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.check_connection is patched + async_device._client.check_connection.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.check_connection is patched if expected["is_online"]: - async_device._session.cli.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.cli is patched + async_device._client.cli.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.cli is patched assert async_device.is_online == expected["is_online"] assert async_device.established == expected["established"] assert async_device.hw_model == expected["hw_model"] @@ -582,7 +582,7 @@ async def test_refresh(self, async_device: AsyncEOSDevice, patch_kwargs: list[di async def test__collect(self, async_device: AsyncEOSDevice, command: dict[str, Any], expected: dict[str, Any]) -> None: """Test AsyncEOSDevice._collect().""" cmd = AntaCommand(command=command["command"], revision=command["revision"]) if "revision" in command else AntaCommand(command=command["command"]) - with patch.object(async_device._session, "cli", **command["patch_kwargs"]): + with patch.object(async_device._client, "cli", **command["patch_kwargs"]): collection_id = "pytest" await async_device.collect(cmd, collection_id=collection_id) commands: list[dict[str, Any]] = [] @@ -600,7 +600,7 @@ async def test__collect(self, async_device: AsyncEOSDevice, command: dict[str, A commands.append({"cmd": cmd.command, "revision": cmd.revision}) else: commands.append({"cmd": cmd.command}) - async_device._session.cli.assert_called_once_with(commands=commands, ofmt=cmd.ofmt, version=cmd.version, req_id=f"ANTA-{collection_id}-{id(cmd)}") # type: ignore[attr-defined] # asynceapi.Device.cli is patched + async_device._client.cli.assert_called_once_with(commands=commands, ofmt=cmd.ofmt, version=cmd.version, req_id=f"ANTA-{collection_id}-{id(cmd)}") # type: ignore[attr-defined] # asynceapi.Device.cli is patched assert cmd.output == expected["output"] assert cmd.errors == expected["errors"] diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py index 705fa18df..8d19a4d1a 100644 --- a/tests/units/test_runner.py +++ b/tests/units/test_runner.py @@ -16,7 +16,7 @@ from anta.catalog import AntaCatalog from anta.inventory import AntaInventory from anta.result_manager import ResultManager -from anta.runner import adjust_rlimit_nofile, main, setup_tests +from anta.runner import adjust_rlimit_nofile, main, prepare_tests from .test_models import FakeTest, FakeTestWithMissingTest @@ -142,13 +142,13 @@ def side_effect_setrlimit(resource_id: int, limits: tuple[int, int]) -> None: ], indirect=["inventory"], ) -async def test_setup_tests( +async def test_prepare_tests( caplog: pytest.LogCaptureFixture, inventory: AntaInventory, tags: set[str], tests: set[str], devices_count: int, tests_count: int ) -> None: - """Test the runner setup_tests function with specific tests.""" + """Test the runner prepare_tests function with specific tests.""" caplog.set_level(logging.WARNING) catalog: AntaCatalog = AntaCatalog.parse(str(DATA_DIR / "test_catalog_with_tags.yml")) - total_tests, selected_tests = setup_tests(inventory=inventory, catalog=catalog, tags=tags, tests=tests) + selected_tests = prepare_tests(inventory=inventory, catalog=catalog, tags=tags, tests=tests) if selected_tests is None: msg = f"There are no tests matching the tags {tags} to run in the current test catalog and device inventory, please verify your inputs." assert msg in caplog.messages @@ -156,7 +156,6 @@ async def test_setup_tests( assert selected_tests is not None assert len(selected_tests) == devices_count assert sum(len(tests) for tests in selected_tests.values()) == tests_count - assert total_tests == tests_count async def test_dry_run(caplog: pytest.LogCaptureFixture, inventory: AntaInventory) -> None: From bfd56d123b0abde915d242a7f16df7a825a07593 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Wed, 6 Nov 2024 18:47:42 -0500 Subject: [PATCH 06/13] Fix docstrings --- anta/runner.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/anta/runner.py b/anta/runner.py index e3d2a6483..8e9e455f5 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -100,7 +100,7 @@ def get_max_concurrency() -> int: The limit is set to the value of the ANTA_MAX_CONCURRENCY environment variable. - If the `ANTA_MAX_CONCURRENCY` environment variable is not set or is invalid, `DEFAULT_MAX_CONCURRENCY` is used. + If the `ANTA_MAX_CONCURRENCY` environment variable is not set or is invalid, `DEFAULT_MAX_CONCURRENCY` is used (10000). Returns ------- @@ -116,11 +116,11 @@ def get_max_concurrency() -> int: def adjust_rlimit_nofile() -> tuple[int, int]: - """Get the maximum number of open file descriptors for the ANTA process. + """Adjust the maximum number of open file descriptors for the ANTA process. The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable. - If the `ANTA_NOFILE` environment variable is not set or is invalid, `DEFAULT_NOFILE` is used. + If the `ANTA_NOFILE` environment variable is not set or is invalid, `DEFAULT_NOFILE` is used (16384). Returns ------- @@ -233,7 +233,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic Returns ------- AntaInventory | None - The filtered AntaInventory or None if there are no devices to run tests on. + The filtered inventory or None if there are no devices to run tests on. """ if len(inventory) == 0: logger.info("The inventory is empty, exiting") @@ -323,7 +323,7 @@ def _generate_test_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTe Parameters ---------- selected_tests - A mapping of devices to the tests to run. + A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function. manager A ResultManager @@ -361,7 +361,7 @@ async def generate_test_coroutines( Parameters ---------- selected_tests - A mapping of devices to the tests to run. The selected tests are created by the `prepare_tests` function. + A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function. manager A ResultManager @@ -378,7 +378,7 @@ def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinitio Warning ------- - This function is deprecated and no longer used by the runner as it now uses a generator created by the `test_generator` function of this module. + This function is deprecated and no longer used by the runner as it now uses a generator created by the `generate_test_coroutines` function of this module. Will be removed in ANTA v2.0 Parameters @@ -395,7 +395,7 @@ def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinitio """ # TODO: Remove this function in ANTA v2.0 warn( - message="`get_coroutines` is deprecated and no longer used by the runner. Use `test_generator` instead. Will be removed in ANTA v2.0.", + message="`get_coroutines` is deprecated and no longer used by the runner. Use `generate_test_coroutines` instead. Will be removed in ANTA v2.0.", category=DeprecationWarning, stacklevel=2, ) From d95cb52b450bf2c97c908214dd59a6404a7c96c8 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 7 Nov 2024 01:30:42 -0500 Subject: [PATCH 07/13] Add doc --- docs/advanced_usages/scaling.md | 577 +++++++++++++++++++++++++++++++- 1 file changed, 575 insertions(+), 2 deletions(-) diff --git a/docs/advanced_usages/scaling.md b/docs/advanced_usages/scaling.md index 8d4482e32..440625d99 100644 --- a/docs/advanced_usages/scaling.md +++ b/docs/advanced_usages/scaling.md @@ -4,6 +4,579 @@ ~ that can be found in the LICENSE file. --> -## Scaling ANTA +# πŸš€ Scaling ANTA: A Comprehensive Guide -TODO: Add content +**Table of Contents:** + +- [πŸš€ Scaling ANTA: A Comprehensive Guide](#-scaling-anta-a-comprehensive-guide) + - [πŸ“– Introduction](#-introduction) + - [πŸ”„ Single Process Optimization](#-single-process-optimization) + - [Understanding ANTA's Asynchronous Nature](#understanding-antas-asynchronous-nature) + - [Resource Management](#resource-management) + - [Connection Management](#connection-management) + - [Timeouts Configuration](#timeouts-configuration) + - [Performance Tuning](#performance-tuning) + - [πŸ”€ Multi-Process Scaling](#-multi-process-scaling) + - [Prerequisites](#prerequisites) + - [Installation Requirements](#installation-requirements) + - [Installing Required Tools](#installing-required-tools) + - [Scaling Strategies](#scaling-strategies) + - [Device-Based Parallelization](#device-based-parallelization) + - [Tag-Based Parallelization](#tag-based-parallelization) + - [Optimizing Catalog Distribution](#optimizing-catalog-distribution) + - [Implementation](#implementation) + - [Device-Based Implementation](#device-based-implementation) + - [Tag-Based Implementation](#tag-based-implementation) + - [Performance Considerations](#performance-considerations) + - [Parallel Job Control](#parallel-job-control) + - [Results Management](#results-management) + - [πŸŽ‰ Conclusion](#-conclusion) + +## πŸ“– Introduction + +!!! warning "Advanced Guide" + This document covers advanced ANTA configuration and scaling strategies. If you need assistance with specific deployments or optimizations: + + - πŸ“§ Contact your favorite Arista Systems Engineer + - πŸ’» Open an issue on ANTA's [GitHub repository](https://github.com/aristanetworks/anta) + - 🀝 Join the discussion in our community channels + + This guide also assumes you have a basic understanding of ANTA and are familiar with its core concepts. If you're new to ANTA, we recommend starting with the [ANTA Getting Started](../getting-started.md) before proceeding. + +ANTA (Arista Network Test Automation) is continually evolving to meet the demands of large-scale network validation. As networks grow in size and complexity, the need for efficient, scalable testing becomes increasingly important. This guide explores comprehensive strategies for scaling ANTA to handle large network fabrics effectively. + +We'll cover two main approaches: + +1. πŸ”„ Single Process Optimization: Leveraging ANTA's built-in asynchronous capabilities with resource management +2. πŸ”€ Multi-Process Scaling: Running multiple ANTA instances in parallel (covered in [Part 2](#-multi-process-scaling)) + +For large fabrics (500+ devices), you may need to combine both approaches for optimal performance. + +## πŸ”„ Single Process Optimization + +### Understanding ANTA's Asynchronous Nature + +ANTA uses Python's `asyncio` library for concurrent execution: + +- πŸ“¦ Each test is a coroutine in the main event loop +- πŸ”„ Tests run concurrently, not in parallel +- βš™οΈ The event loop manages all coroutines and network I/O + +**Key Concept**: While `asyncio` provides excellent concurrency, there are practical limits to how many coroutines can be efficiently managed in a single event loop running on one CPU core. High coroutine counts increase event loop overhead, lead to frequent context switching, consume more memory, and may degrade performance. + +### Resource Management + +ANTA now provides several environment variables to control resource usage: + +!!! note "Default Values" + The values specified in the following examples are the actual default values used by ANTA. See the [Performance Tuning](#performance-tuning) section for guidance on adjusting these values. + +1. **Test Concurrency**: + + ```bash + export ANTA_MAX_CONCURRENCY=10000 + ``` + + This limits the number of concurrent test coroutines to prevent event loop overload. ANTA will schedule coroutines up to this limit, then wait for some to complete before scheduling more. + +2. **File Descriptors**: + + ```bash + export ANTA_NOFILE=16384 + ``` + + Sets the maximum number of open file descriptors for the ANTA process, usually for handling device connections. At startup, ANTA sets its process’s soft limit to the maximum allowed (up to `16384`). This adjustment is necessary because the soft limit is typically set to `1024`, while the hard limit is often higher (system-dependent). If ANTA’s hard limit is lower than the number of selected tests in ANTA, the process may request more file descriptors than the operating system allows, causing an error. In such cases, a WARNING is displayed at startup. + + To address this, consider increasing the hard limit for the user starting the ANTA process. You can check the current hard limit for a user by running the command `ulimit -n -H` in the terminal. To set a new limit, create the file `/etc/security/limits.d/10-anta.conf` with the following content: + + ```bash + hard nofile + ``` + + Replace `` with the username used to start the ANTA process, and `` with the desired hard limit. The maximum value will depend on your system. After creating this file, log out of your current session and log back in for the changes to take effect. + +### Connection Management + +ANTA uses the `httpx` library as its underlying HTTP client for device connections. Each device has his own connection pool with a default maximum of `100` connections. On large fabrics, this can lead to a very high number of connections (n x 100), which may overwhelm the ANTA process. Connection pooling can be tuned via: + +```bash +# Maximum number of allowable connections +export ANTA_MAX_CONNECTIONS=100 + +# Number of allowable keep-alive connections +export ANTA_MAX_KEEPALIVE_CONNECTIONS=20 + +# Time limit on idle keep-alive connections in seconds +export ANTA_KEEPALIVE_EXPIRY=5.0 +``` + +**Best Practice**: For large fabrics, limiting connections (5-10 per device) has shown optimal performance in testing. `ANTA_MAX_KEEPALIVE_CONNECTIONS` should be lower than `ANTA_MAX_CONNECTIONS`. See [HTTPX documentation](https://www.python-httpx.org/advanced/resource-limits/) for more details. + +### Timeouts Configuration + +ANTA provides several environment variables to control `httpx` timeouts: + +```bash +# Global timeout +export ANTA_TIMEOUT=30.0 + +# Maximum amount of time to wait until a socket connection to the requested host is established +export ANTA_CONNECT_TIMEOUT=30.0 + +# Maximum duration to wait for a chunk of data to be received (for example, a chunk of the response body) +export ANTA_READ_TIMEOUT=30.0 + +# Maximum duration to wait for a chunk of data to be sent (for example, a chunk of the request body) +export ANTA_WRITE_TIMEOUT=30.0 + +# Maximum duration to wait for acquiring a connection from the connection pool +export ANTA_POOL_TIMEOUT=30.0 +``` + +If not set, these values default to the global `timeout` of ANTA, which is `30.0` seconds by default. The global timeout can also be set via the `--timeout` command-line option. See [HTTPX documentation](https://www.python-httpx.org/advanced/timeouts/) for more details. + +### Performance Tuning + +For optimal single-process performance, consider the following tuning parameters: + +1. Adjust concurrency based on test count and system resources: + + ```bash + # For 250 devices with 100 tests each (25,000 total tests) + export ANTA_MAX_CONCURRENCY=25000 + ``` + + Even though the default value is `10000`, it may not be optimal for your system. You can increase or decrease this value gradually to find the best setting for your environment. + +2. Optimize connection pooling: + + ```bash + export ANTA_MAX_CONNECTIONS=10 + export ANTA_MAX_KEEPALIVE_CONNECTIONS=5 + ``` + + These values are a good starting point for large fabrics. Adjust them based on your fabric size and performance testing. + +3. Increase timeouts for stability: + + ```bash + export ANTA_TIMEOUT=3600.0 + ``` + + This is **very** important for large fabrics. Increase the global timeout to prevent test failures due to timeouts. You shouldn't need to adjust the other timeout values unless you have specific requirements. + +## πŸ”€ Multi-Process Scaling + +When single-process optimization isn't enough, you can scale ANTA horizontally by running multiple instances in parallel. This approach is particularly useful for large fabrics with 500+ devices. This section will provide a few examples of how to achieve this using the common tool [`GNU Parallel`](https://www.gnu.org/software/parallel/). + +By leveraging `GNU Parallel` and proper test organization, we can significantly reduce execution time and improve overall performance while maintaining accurate test results. + +### Prerequisites + +Before implementing multi-process scaling: + +- πŸ—οΈ Understand your fabric topology +- 🎯 Identify natural test boundaries (e.g., devices, PODs, roles, etc.) +- πŸ“‹ Plan your test catalog(s) accordingly +- πŸ’» Understand basic Linux command-line operations + +### Installation Requirements + +!!! info "System Requirements" + The following procedures and examples were tested on **Ubuntu 22.04**, using `GNU Parallel` version `20241022` and `yq` version `v4.44.3`. Please refer the tool's documentation for specific installation instructions. + +#### Installing Required Tools + +1. Install `GNU Parallel` using the following commands: + + ```bash + apt-get install parallel + ``` + +2. Install `yq` using the following commands: + + ```bash + wget https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 -O /usr/bin/yq &&\ + chmod +x /usr/bin/yq + ``` + +3. Confirm the installation by running: + + ```bash + parallel --version + yq --version + ``` + + You should see the version information for both tools if the installation was successful: + + ```bash + (.dev) ~ parallel --version + GNU parallel 20241022 + Copyright (C) 2007-2024 Ole Tange, http://ole.tange.dk and Free Software + Foundation, Inc. + License GPLv3+: GNU GPL version 3 or later + This is free software: you are free to change and redistribute it. + GNU parallel comes with no warranty. + + Web site: https://www.gnu.org/software/parallel + + When using programs that use GNU Parallel to process data for publication + please cite as described in 'parallel --citation'. + + (.dev) ~ yq --version + yq (https://github.com/mikefarah/yq/) version v4.44.3 + ``` + +### Scaling Strategies + +#### Device-Based Parallelization + +In this approach, we run ANTA instances in parallel, with each instance handling a specific device. This strategy is effective when: + +- Each device has unique test requirements, i.e., one catalog per device +- You want to maintain separate results per device +- You need fine-grained control over test execution + +Example inventory structure: + +```yaml +anta_inventory: + hosts: + - host: 172.20.20.101 + name: DC1-SPINE1 + - host: 172.20.20.102 + name: DC1-SPINE2 + # ... more devices +``` + +Example test catalog structure (one catalog per device): + +```yaml +# DC1-SPINE2.yml +anta.tests.connectivity: +- VerifyLLDPNeighbors: + neighbors: + - neighbor_device: DC1-LEAF1A + neighbor_port: Ethernet1 + port: Ethernet1 + result_overwrite: + custom_field: 'Local: Ethernet1 - Remote: DC1-LEAF1A Ethernet1' + +# DC1-SPINE1.yml +anta.tests.connectivity: +- VerifyLLDPNeighbors: + neighbors: + - neighbor_device: DC1-LEAF1A + neighbor_port: Ethernet2 + port: Ethernet1 + result_overwrite: + custom_field: 'Local: Ethernet1 - Remote: DC1-LEAF1A Ethernet2' +``` + +#### Tag-Based Parallelization + +This approach runs parallel instances based on device tags, useful when: + +- Devices in the same role share similar test requirements +- You want to test entire PODs or device groups simultaneously +- You need to organize tests by network function or location + +Example inventory structure: + +```yaml +--- +anta_inventory: + hosts: + - host: 172.20.20.101 + name: DC1-SPINE1 + tags: ["SPINE", "DC1"] + + - host: 172.20.20.102 + name: DC1-SPINE2 + tags: ["SPINE", "DC1"] + + - host: 172.20.20.111 + name: DC1-LEAF1A + tags: ["LEAF", "DC1"] + + - host: 172.20.20.112 + name: DC1-LEAF1B + tags: ["LEAF", "DC1"] + + - host: 172.20.20.201 + name: DC2-SPINE1 + tags: ["SPINE", "DC2"] + + - host: 172.20.20.202 + name: DC2-SPINE2 + tags: ["SPINE", "DC2"] + # ... more devices +``` + +Example test catalog structure (one catalog for all devices): + +```yaml +anta.tests.vxlan: + - VerifyVxlan1Interface: + filters: + tags: ["BL", "LEAF"] + - VerifyVxlanConfigSanity: + filters: + tags: ["BL", "LEAF"] + +anta.tests.routing: + generic: + - VerifyRoutingProtocolModel: + model: multi-agent + filters: + tags: ["SPINE"] +``` + +#### Optimizing Catalog Distribution + +For large fabrics, consider: + +1. Creating separate catalogs per device type +2. Organizing catalogs by POD or datacenter +3. Using tag-based filtering within catalogs + +Benefits: + +- Reduced memory usage per ANTA instance +- Faster catalog processing +- More efficient test execution + +### Implementation + +#### Device-Based Implementation + +1. Create the device-specific test runner BASH script (`nrfu.sh`): + + ```bash + #!/bin/bash + + set -euo pipefail + + exec 2>&1 + + usage() { + echo "Usage: $0 []" + exit 1 + } + + ANTA_INVENTORY="anta_inventory.yml" + TEST_CATALOGS_PATH="intended/test_catalogs" + NODE_NAME=${1:-} + NRFU_REPORT_PATH=${2:-nrfu_reports} + + + if [[ -z "$NODE_NAME" ]]; then + usage + fi + + anta nrfu \ + -d "$NODE_NAME" \ + -i "$ANTA_INVENTORY" \ + -c "$TEST_CATALOGS_PATH/$NODE_NAME-catalog.yml" \ + json -o "$NRFU_REPORT_PATH/$NODE_NAME.json" + ``` + + Change `ANTA_INVENTORY` and `TEST_CATALOGS_PATH` to match your environment. + +2. Create the parallel execution script (`run.sh`): + + ```bash + #!/bin/bash + + mkdir -p nrfu_reports + + NODE_NAMES=$(yq '.anta_inventory.hosts[].name' < "anta_inventory.yml" | tr -d '"') + + parallel --tag './nrfu.sh {} nrfu_reports' ::: $NODE_NAMES || NRFU_STATUS=$? + + exit $NRFU_STATUS + ``` + + Also change `anta_inventory.yml` to match your inventory file specified in `ANTA_INVENTORY` of `nrfu.sh`. + +3. Make both scripts executable: + + ```bash + chmod +x nrfu.sh run.sh + ``` + +4. Run the script: + + ```bash + ./run.sh + ``` + +5. The script will execute ANTA tests for each device in parallel, generating a JSON report for each device in the `nrfu_reports` directory. + +#### Tag-Based Implementation + +1. Create the tag-specific test runner (`nrfu-tag.sh`): + + ```bash + #!/bin/bash + set -euo pipefail + + exec 2>&1 + + usage() { + echo "Usage: $0 []" + exit 1 + } + + ANTA_INVENTORY="anta_inventory.yml" + ANTA_CATALOG="anta_catalog.yml" + TAG_NAME=${1:-} + NRFU_REPORT_PATH=${2:-nrfu_reports} + + if [[ -z "$TAG_NAME" ]]; then + usage + fi + + anta nrfu \ + --tags "$TAG_NAME" \ + -i "$ANTA_INVENTORY" \ + -c "$ANTA_CATALOG" \ + json -o "$NRFU_REPORT_PATH/${TAG_NAME}.json" + ``` + + Same as before, adjust `ANTA_INVENTORY`, `ANTA_CATALOG` to match your environment. + +2. Create the parallel execution script (`run-tag.sh`): + + ```bash + #!/bin/bash + + mkdir -p nrfu_reports + + TAGS=$(yq -r '.. | select(.filters?) | .filters.tags[]' < "anta_catalog.yml" | sort -u) + + parallel --tag './nrfu_tag.sh {} nrfu_reports' ::: $TAGS || NRFU_STATUS=$? + + exit $NRFU_STATUS + ``` + + Change `anta_catalog.yml` to match your catalog file specified in `ANTA_CATALOG` of `nrfu-tag.sh`. + +3. Make both scripts executable: + + ```bash + chmod +x nrfu-tag.sh run-tag.sh + ``` + +4. Run the script: + + ```bash + ./run-tag.sh + ``` + +5. The script will execute ANTA tests for each tag in parallel, generating a JSON report for each tag in the `nrfu_reports` directory. + +### Performance Considerations + +#### Parallel Job Control + +The `-j` option in `GNU Parallel` controls the number of concurrent jobs. Consider these factors when setting the job limit: + +1. **Available CPU cores**: + + - Recommend: jobs = number of cores - 1 + - Example: For 8 cores, use `-j7` + +2. **Memory requirements**: + + - Each ANTA instance requires memory + - Monitor system resources during execution using `htop` or similar tools + +### Results Management + +ANTA can output results in various formats: + +1. JSON format (default in our scripts): + + ```bash + anta nrfu -d -i -c json -o + ``` + +2. Markdown format: + + ```bash + anta nrfu -d -i -c md-report --md-output + ``` + + Please refer to the [ANTA CLI documentation](https://anta.arista.com/stable/cli/nrfu/) for more information on result output formats. + + !!! note "Output Formats" + The scripts provided in this guide output JSON files for each device or tag. You can modify them to output in other formats as needed. + +3. Optional: You can merge the JSON results and generate a JUnit report using the following Python script (requires `junitparser`): + + ```python + from pathlib import Path + + from junitparser import TestCase, TestSuite, JUnitXml, Skipped, Error, Failure + import json + + xml = JUnitXml() + + for report_path in Path("nrfu_reports").glob("*.json"): + node_name = report_path.stem + test_case_results = {} + + print(f"Processing {report_path}") + with open(report_path, mode="r", encoding="utf-8") as fd: + nrfu_report = json.load(fd) + + for report_idx, report in enumerate(nrfu_report): + report_test = report["test"] + test_case = test_case_results.setdefault(report_test, []) + + report_result = report["result"] + if report_result == "success": + continue + + report_message = ",".join(report["messages"]) + + result_cls = { + "error": Error, + "skipped": Skipped, + "failure": Failure, + }.get(report_result) + test_case.append(result_cls(message=report_message)) + + # Add suite to JunitXml + node_suite = TestSuite(node_name) + test_cases = [] + n_tests = 0 + for test_name, test_results in test_case_results.items(): + test_case = TestCase(name=test_name, classname=f"{node_name}") + test_case.result = test_results + n_tests += len(test_results) + test_cases.append(test_case) + node_suite.add_testcases(test_cases) + xml.add_testsuite(node_suite) + + xml.write("nrfu.xml", pretty=True) + ``` + +## πŸŽ‰ Conclusion + +By implementing these scaling strategies, you can efficiently run ANTA tests on large network fabrics. The combination of parallel execution, optimized catalogs, and proper resource management ensures fast and reliable network validation. + +Remember to: + +- βœ… Choose the appropriate parallelization strategy +- πŸ“‹ Optimize catalog organization +- πŸ“Š Monitor system resources +- ⚑ Adjust parallel job limits based on your environment + +!!! tip "Need Help?" + Performance tuning can be complex and highly dependent on your specific deployment. Don't hesitate to: + + - πŸ“ž Reach out to your Arista SE for guidance + - πŸ“ Document your specific use case on GitHub + - πŸ” Share your findings with the community From 3dede8b5459a62211512fd3d6fdf935c738e26d7 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 7 Nov 2024 14:47:07 -0500 Subject: [PATCH 08/13] Update doc --- docs/advanced_usages/scaling.md | 55 ++++++++++++++++++++++++++++++++- docs/faq.md | 6 ++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/docs/advanced_usages/scaling.md b/docs/advanced_usages/scaling.md index 440625d99..be1fab906 100644 --- a/docs/advanced_usages/scaling.md +++ b/docs/advanced_usages/scaling.md @@ -29,8 +29,10 @@ - [Tag-Based Implementation](#tag-based-implementation) - [Performance Considerations](#performance-considerations) - [Parallel Job Control](#parallel-job-control) + - [JSON vs YAML](#json-vs-yaml) - [Results Management](#results-management) - [πŸŽ‰ Conclusion](#-conclusion) + - [πŸ“š References](#-references) ## πŸ“– Introduction @@ -492,6 +494,28 @@ The `-j` option in `GNU Parallel` controls the number of concurrent jobs. Consid - Each ANTA instance requires memory - Monitor system resources during execution using `htop` or similar tools +#### JSON vs YAML + +When working with large catalogs and inventories, using JSON format instead of YAML can provide better performance as ANTA loads JSON files more efficiently. If you choose to use JSON: + +- Convert your YAML files to JSON (you can use `yq` for this) +- Modify the scripts to use `jq` instead of `yq`: + + ```bash + # For device-based implementation + NODE_NAMES=$(jq -r '.anta_inventory.hosts[].name' < "anta_inventory.json") + + # For tag-based implementation + TAGS=$(jq -r '.. | select(.filters?) | .filters.tags[]' < "anta_catalog.json" | sort -u) + ``` + + Example of converting YAML to JSON: + + ```bash + yq -o=json eval 'anta_inventory.yaml' > anta_inventory.json + yq -o=json eval 'anta_catalog.yaml' > anta_catalog.json + ``` + ### Results Management ANTA can output results in various formats: @@ -563,6 +587,9 @@ ANTA can output results in various formats: xml.write("nrfu.xml", pretty=True) ``` + !!! tip "CI/CD Integration" + This report format can be used with CI/CD tools like Jenkins, GitLab, or GitHub Actions. + ## πŸŽ‰ Conclusion By implementing these scaling strategies, you can efficiently run ANTA tests on large network fabrics. The combination of parallel execution, optimized catalogs, and proper resource management ensures fast and reliable network validation. @@ -578,5 +605,31 @@ Remember to: Performance tuning can be complex and highly dependent on your specific deployment. Don't hesitate to: - πŸ“ž Reach out to your Arista SE for guidance - - πŸ“ Document your specific use case on GitHub + - πŸ“ Document your specific use case on [GitHub](https://github.com/aristanetworks/anta) - πŸ” Share your findings with the community + +## πŸ“š References + +- **Python AsyncIO** + - [AsyncIO Documentation](https://docs.python.org/3/library/asyncio.html) + - [AsyncIO Event Loop](https://docs.python.org/3/library/asyncio-eventloop.html) + - [AsyncIO Tasks and Coroutines](https://docs.python.org/3/library/asyncio-task.html) + +- **HTTPX** + - [HTTPX Documentation](https://www.python-httpx.org/) + - [Timeouts](https://www.python-httpx.org/advanced/timeouts/) + - [Resource Limits](https://www.python-httpx.org/advanced/resource-limits/) + +- **GNU Parallel** + - [GNU Parallel Documentation](https://www.gnu.org/software/parallel/) + - [GNU Parallel Tutorial](https://www.gnu.org/software/parallel/parallel_tutorial.html) + +- **JSON and YAML Processing** + - [jq Manual](https://jqlang.github.io/jq/manual/) + - [jq GitHub Repository](https://github.com/jqlang/jq) + - [yq Documentation](https://mikefarah.gitbook.io/yq/) + - [yq GitHub Repository](https://github.com/mikefarah/yq/) + +- **JUnit/xUnit Result XML Parser** + - [junitparser Documentation](https://junitparser.readthedocs.io/en/latest/) + - [junitparser GitHub Repository](https://github.com/weiwei/junitparser) diff --git a/docs/faq.md b/docs/faq.md index 7a5866337..443dd08f3 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -43,6 +43,9 @@ anta_title: Frequently Asked Questions (FAQ) The `user` is the one with which the ANTA process is started. The `value` is the new hard limit. The maximum value depends on the system. A hard limit of 16384 should be sufficient for ANTA to run in most high scale scenarios. After creating this file, log out the current session and log in again. + !!! tip "Large Scale Deployments" + For detailed information about managing file descriptors and other resource limits in large scale deployments, please refer to the [Scaling ANTA](advanced_usages/scaling.md#resource-management) guide. + ## `Timeout` error in the logs ???+ faq "`Timeout` error in the logs" @@ -62,6 +65,9 @@ anta_title: Frequently Asked Questions (FAQ) The previous command set a couple of options for ANTA NRFU, one them being the `timeout` command, by default, when running ANTA from CLI, it is set to 30s. The timeout is increased to 50s to allow ANTA to wait for API calls a little longer. + !!! tip "Advanced Timeout Configuration" + For comprehensive information about timeout configuration and optimization in large scale environments, see the [Timeouts Configuration](advanced_usages/scaling.md#timeouts-configuration) section in the Scaling ANTA guide. The guide provides detailed explanations of different timeout types and recommended values for various deployment scenarios. + ## `ImportError` related to `urllib3` ???+ faq "`ImportError` related to `urllib3` when running ANTA" From d3abe570e02963e08ed8815845e8811a51b8fe37 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 7 Nov 2024 14:48:39 -0500 Subject: [PATCH 09/13] Single quote for Python --- anta/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anta/runner.py b/anta/runner.py index 8e9e455f5..30713cb7d 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -71,7 +71,7 @@ def log_run_information( f"Tests: {test_count} total\n" f"Limits:\n" f" Max concurrent tests: {max_concurrency}\n" - f" Max connections per device: {"Unlimited" if max_connections is None else max_connections}\n" + f" Max connections per device: {'Unlimited' if max_connections is None else max_connections}\n" f" Max file descriptors: {file_descriptor_limit}\n" f"{'':-^{width}}" ) From 2ccc0800190ca74d55876fcb02a3f0ac8f1dc06e Mon Sep 17 00:00:00 2001 From: gmuloc Date: Tue, 12 Nov 2024 16:22:26 +0100 Subject: [PATCH 10/13] Fix: Let's keep the artices --- anta/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anta/runner.py b/anta/runner.py index 30713cb7d..764f13e7a 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -475,7 +475,7 @@ async def main( # noqa: PLR0913 ) if dry_run: - logger.info("Dry-run mode, exiting before running tests.") + logger.info("Dry-run mode, exiting before running the tests.") async for test in generator: test.close() return From 8bb70931782bca2849c5c3b81f1e4abc8ad52553 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 28 Nov 2024 00:24:13 -0500 Subject: [PATCH 11/13] Added None supports for timeouts --- .pre-commit-config.yaml | 1 + anta/cli/utils.py | 31 ++++++- anta/device.py | 78 +----------------- anta/runner.py | 30 ++----- anta/settings.py | 142 +++++++++++++++++++++++++++++++++ pyproject.toml | 1 + tests/benchmark/test_runner.py | 31 ++++--- 7 files changed, 196 insertions(+), 118 deletions(-) create mode 100644 anta/settings.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 97457ece5..50cac0094 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -74,6 +74,7 @@ repos: - pytest - pytest-codspeed - respx + - pydantic-settings - repo: https://github.com/codespell-project/codespell rev: v2.3.0 diff --git a/anta/cli/utils.py b/anta/cli/utils.py index a939c3220..1e1191177 100644 --- a/anta/cli/utils.py +++ b/anta/cli/utils.py @@ -19,7 +19,7 @@ from anta.inventory.exceptions import InventoryIncorrectSchemaError, InventoryRootKeyError if TYPE_CHECKING: - from click import Option + from click import Context, Option, Parameter logger = logging.getLogger(__name__) @@ -39,6 +39,32 @@ class ExitCode(enum.IntEnum): TESTS_FAILED = 4 +class FloatOrNoneParamType(click.ParamType): + """Click ParamType that accepts float values or 'None'. + + https://click.palletsprojects.com/en/stable/parameters/#how-to-implement-custom-types + """ + + name = "float_or_none" + + # pylint: disable=inconsistent-return-statements + def convert(self, value: str | float | None, param: Parameter | None, ctx: Context | None) -> float | None: + """Convert the value to a float or None.""" + if value is None or isinstance(value, float): + return value + + try: + if isinstance(value, str) and value.lower() == "none": + return None + return float(value) + except ValueError: + self.fail(f"{value!r} is not a valid float or 'None'", param, ctx) + # No return here because `self.fail` raises an exception + + +FLOAT_OR_NONE = FloatOrNoneParamType() + + def parse_tags(ctx: click.Context, param: Option, value: str | None) -> set[str] | None: # ruff: noqa: ARG001 """Click option callback to parse an ANTA inventory tags.""" @@ -162,6 +188,7 @@ def core_options(f: Callable[..., Any]) -> Callable[..., Any]: show_envvar=True, envvar="ANTA_TIMEOUT", show_default=True, + type=FLOAT_OR_NONE, ) @click.option( "--insecure", @@ -201,7 +228,7 @@ def wrapper( enable_password: str | None, enable: bool, prompt: bool, - timeout: float, + timeout: float | None, insecure: bool, disable_cache: bool, **kwargs: dict[str, Any], diff --git a/anta/device.py b/anta/device.py index 8d7b9fa75..a4e1903eb 100644 --- a/anta/device.py +++ b/anta/device.py @@ -7,7 +7,6 @@ import asyncio import logging -import os from abc import ABC, abstractmethod from collections import defaultdict from typing import TYPE_CHECKING, Any, Literal @@ -17,12 +16,13 @@ from aiocache import Cache from aiocache.plugins import HitMissRatioPlugin from asyncssh import SSHClientConnection, SSHClientConnectionOptions -from httpx import ConnectError, HTTPError, Limits, Timeout, TimeoutException +from httpx import ConnectError, HTTPError, TimeoutException import asynceapi from anta import __DEBUG__ from anta.logger import anta_log_exception, exc_to_str from anta.models import AntaCommand +from anta.settings import get_httpx_limits, get_httpx_timeout if TYPE_CHECKING: from collections.abc import Iterator @@ -34,78 +34,6 @@ # https://github.com/pyca/cryptography/issues/7236#issuecomment-1131908472 CLIENT_KEYS = asyncssh.public_key.load_default_keypairs() -ANTA_DEFAULT_TIMEOUT = 30.0 - - -def get_httpx_limits() -> Limits: - """Adjust the underlying HTTPX client resource limits. - - The limits are set using the following environment variables: - - ANTA_MAX_CONNECTIONS: Maximum number of allowable connections. - - ANTA_MAX_KEEPALIVE_CONNECTIONS: Number of allowable keep-alive connections. - - ANTA_KEEPALIVE_EXPIRY: Time limit on idle keep-alive connections in seconds. - - If any environment variable is not set or is invalid, the following HTTPX default limits are used: - - max_connections: 100 - - max_keepalive_connections: 20 - - keepalive_expiry: 5.0 - - These limits are set for all devices. - - Returns - ------- - Limits - HTTPX Limits object with configured connection limits. - - TODO: HTTPX supports None to disable limits. This is not implemented yet. - """ - try: - max_connections = int(os.environ.get("ANTA_MAX_CONNECTIONS", 100)) - max_keepalive_connections = int(os.environ.get("ANTA_MAX_KEEPALIVE_CONNECTIONS", 20)) - keepalive_expiry = float(os.environ.get("ANTA_KEEPALIVE_EXPIRY", 5.0)) - except ValueError as exc: - default_limits = Limits(max_connections=100, max_keepalive_connections=20, keepalive_expiry=5.0) - logger.warning("Error parsing HTTPX resource limits from environment variables: %s\nDefaults to %s", exc, default_limits) - return default_limits - return Limits(max_connections=max_connections, max_keepalive_connections=max_keepalive_connections, keepalive_expiry=keepalive_expiry) - - -def get_httpx_timeout(timeout: float | None) -> Timeout: - """Adjust the underlying HTTPX client timeout. - - The timeouts are set using the following environment variables: - - ANTA_CONNECT_TIMEOUT: Maximum amount of time to wait until a socket connection to the requested host is established. - - ANTA_READ_TIMEOUT: Maximum duration to wait for a chunk of data to be received (for example, a chunk of the response body). - - ANTA_WRITE_TIMEOUT: Maximum duration to wait for a chunk of data to be sent (for example, a chunk of the request body). - - ANTA_POOL_TIMEOUT: Maximum duration to wait for acquiring a connection from the connection pool. - - If any environment variable is not set or is invalid, the provided timeout value is used. - If no timeout is provided, 30 seconds is used which is the default when running ANTA. - - Parameters - ---------- - timeout : float | None - Global timeout value in seconds. Used if specific timeouts are not set. - - Returns - ------- - Timeout - HTTPX Timeout object with configured timeout values. - - TODO: HTTPX supports None to disable timeouts. This is not implemented yet. - """ - timeout = timeout if timeout is not None else ANTA_DEFAULT_TIMEOUT - try: - connect = float(os.environ.get("ANTA_CONNECT_TIMEOUT", timeout)) - read = float(os.environ.get("ANTA_READ_TIMEOUT", timeout)) - write = float(os.environ.get("ANTA_WRITE_TIMEOUT", timeout)) - pool = float(os.environ.get("ANTA_POOL_TIMEOUT", timeout)) - except ValueError as exc: - default_timeout = Timeout(timeout=timeout) - logger.warning("Error parsing HTTPX timeouts from environment variables: %s\nDefaults to %s", exc, default_timeout) - return default_timeout - return Timeout(connect=connect, read=read, write=write, pool=pool) - class AntaDevice(ABC): """Abstract class representing a device in ANTA. @@ -405,7 +333,7 @@ def _create_asynceapi_client( self, host: str, port: int | None, username: str, password: str, proto: Literal["http", "https"], timeout: float | None ) -> asynceapi.Device: """Create the asynceapi client with the provided parameters.""" - # Get resource limits and timeout values from environment variables or use default values + # Get resource limits and timeout values client_limits = get_httpx_limits() client_timeout = get_httpx_timeout(timeout) diff --git a/anta/runner.py b/anta/runner.py index 764f13e7a..1f69d1eb0 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -15,9 +15,9 @@ from anta import GITHUB_SUGGESTION from anta.cli.console import console -from anta.device import get_httpx_limits from anta.logger import anta_log_exception, exc_to_str from anta.models import AntaTest +from anta.settings import get_httpx_limits, get_max_concurrency from anta.tools import Catchtime, cprofile if TYPE_CHECKING: @@ -34,8 +34,6 @@ DEFAULT_NOFILE = 16384 """Default number of open file descriptors for the ANTA process.""" -DEFAULT_MAX_CONCURRENCY = 10000 -"""Default maximum number of tests to run concurrently.""" def log_run_information( @@ -95,32 +93,12 @@ def log_run_information( ) -def get_max_concurrency() -> int: - """Adjust the maximum number of tests (coroutines) to run concurrently. - - The limit is set to the value of the ANTA_MAX_CONCURRENCY environment variable. - - If the `ANTA_MAX_CONCURRENCY` environment variable is not set or is invalid, `DEFAULT_MAX_CONCURRENCY` is used (10000). - - Returns - ------- - int - The maximum number of tests to run concurrently. - """ - try: - max_concurrency = int(os.environ.get("ANTA_MAX_CONCURRENCY", DEFAULT_MAX_CONCURRENCY)) - except ValueError as exception: - logger.warning("The ANTA_MAX_CONCURRENCY environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_MAX_CONCURRENCY) - return DEFAULT_MAX_CONCURRENCY - return max_concurrency - - def adjust_rlimit_nofile() -> tuple[int, int]: """Adjust the maximum number of open file descriptors for the ANTA process. The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable. - If the `ANTA_NOFILE` environment variable is not set or is invalid, `DEFAULT_NOFILE` is used (16384). + If the `ANTA_NOFILE` environment variable is not set or is invalid, `DEFAULT_NOFILE` is used. Returns ------- @@ -181,6 +159,10 @@ async def run(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResult], N ------ The result of each completed test. """ + if limit <= 0: + msg = "Concurrency limit must be greater than 0." + raise RuntimeError(msg) + # NOTE: The `aiter` built-in function is not available in Python 3.9 aws = tests_generator.__aiter__() # pylint: disable=unnecessary-dunder-call aws_ended = False diff --git a/anta/settings.py b/anta/settings.py new file mode 100644 index 000000000..936fa9aa8 --- /dev/null +++ b/anta/settings.py @@ -0,0 +1,142 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Settings for ANTA.""" + +from __future__ import annotations + +from httpx import Limits, Timeout +from pydantic import Field, PositiveInt +from pydantic_settings import BaseSettings, SettingsConfigDict + +# Default values for HTTPX resource limits +HTTPX_MAX_CONNECTIONS = 100 +HTTPX_MAX_KEEPALIVE_CONNECTIONS = 20 +HTTPX_KEEPALIVE_EXPIRY = 5.0 + +# Default values for HTTPX timeouts +HTTPX_CONNECT_TIMEOUT = 5.0 +HTTPX_READ_TIMEOUT = 5.0 +HTTPX_WRITE_TIMEOUT = 5.0 +HTTPX_POOL_TIMEOUT = 5.0 + +# Default value for the maximum number of concurrent tests in the event loop +MAX_CONCURRENCY = 10000 + + +class MaxConcurrencySettings(BaseSettings): + """Environment variable for configuring the maximum number of concurrent tests in the event loop.""" + + model_config = SettingsConfigDict(env_prefix="ANTA_") + + max_concurrency: PositiveInt = Field(default=MAX_CONCURRENCY) + + +class HttpxResourceLimitsSettings(BaseSettings): + """Environment variables for configuring the underlying HTTPX client resource limits. + + The limits are set using the following environment variables: + - ANTA_MAX_CONNECTIONS: Maximum number of allowable connections. + - ANTA_MAX_KEEPALIVE_CONNECTIONS: Number of allowable keep-alive connections. + - ANTA_KEEPALIVE_EXPIRY: Time limit on idle keep-alive connections in seconds. + + If any environment variable is not set, the following HTTPX default limits are used: + - max_connections: 100 + - max_keepalive_connections: 20 + - keepalive_expiry: 5.0 + + These limits are set for all devices. `None` means no limit is set for the given operation. + + For more information on HTTPX resource limits, see: https://www.python-httpx.org/advanced/resource-limits/ + """ + + # The 'None' string is used to allow the environment variable to be set to `None`. + model_config = SettingsConfigDict(env_parse_none_str="None", env_prefix="ANTA_") + + max_connections: int | None = Field(default=HTTPX_MAX_CONNECTIONS) + max_keepalive_connections: int | None = Field(default=HTTPX_MAX_KEEPALIVE_CONNECTIONS) + keepalive_expiry: float | None = Field(default=HTTPX_KEEPALIVE_EXPIRY) + + +class HttpxTimeoutsSettings(BaseSettings): + """Environment variables for configuring the underlying HTTPX client timeouts. + + The timeouts are set using the following environment variables: + - ANTA_CONNECT_TIMEOUT: Maximum amount of time to wait until a socket connection to the requested host is established. + - ANTA_READ_TIMEOUT: Maximum duration to wait for a chunk of data to be received (for example, a chunk of the response body). + - ANTA_WRITE_TIMEOUT: Maximum duration to wait for a chunk of data to be sent (for example, a chunk of the request body). + - ANTA_POOL_TIMEOUT: Maximum duration to wait for acquiring a connection from the connection pool. + + If any environment variable is not set, the default HTTPX timeout is used, 5 seconds. + `None` will disable the timeout for the given operation. + + For more information on HTTPX timeouts, see: https://www.python-httpx.org/advanced/timeouts/ + """ + + # The 'None' string is used to allow the environment variable to be set to `None`. + model_config = SettingsConfigDict(env_parse_none_str="None", env_prefix="ANTA_") + + connect_timeout: float | None = Field(default=HTTPX_CONNECT_TIMEOUT) + read_timeout: float | None = Field(default=HTTPX_READ_TIMEOUT) + write_timeout: float | None = Field(default=HTTPX_WRITE_TIMEOUT) + pool_timeout: float | None = Field(default=HTTPX_POOL_TIMEOUT) + + # The following properties are used to determine if a specific timeout was set by an environment variable + @property + def connect_set(self) -> bool: + """Return True if the connect timeout was set by an environment variable.""" + return "connect_timeout" in self.model_fields_set + + @property + def read_set(self) -> bool: + """Return True if the read timeout was set by an environment variable.""" + return "read_timeout" in self.model_fields_set + + @property + def write_set(self) -> bool: + """Return True if the write timeout was set by an environment variable.""" + return "write_timeout" in self.model_fields_set + + @property + def pool_set(self) -> bool: + """Return True if the pool timeout was set by an environment variable.""" + return "pool_timeout" in self.model_fields_set + + +def get_max_concurrency() -> int: + """Get the maximum number of concurrent tests that can run in the event loop.""" + settings = MaxConcurrencySettings() + return settings.max_concurrency + + +def get_httpx_limits() -> Limits: + """Get the HTTPX Limits object from environment variables.""" + settings = HttpxResourceLimitsSettings() + return Limits( + max_connections=settings.max_connections, + max_keepalive_connections=settings.max_keepalive_connections, + keepalive_expiry=settings.keepalive_expiry, + ) + + +def get_httpx_timeout(default_timeout: float | None) -> Timeout: + """Get the HTTPX Timeout object from environment variables. + + Parameters + ---------- + default_timeout : float | None + Default timeout value to use if no specific timeout is set for a given operation. + + Notes + ----- + When running ANTA NRFU from the command line, `default_timeout` is set to 30 seconds by default. + Otherwise, an `AsyncEOSDevice` class is instantiated with a `timeout` parameter set to `None` + by default, meaning no timeout is set. + """ + settings = HttpxTimeoutsSettings() + return Timeout( + connect=settings.connect_timeout if settings.connect_set else default_timeout, + read=settings.read_timeout if settings.read_set else default_timeout, + write=settings.write_timeout if settings.write_set else default_timeout, + pool=settings.pool_timeout if settings.pool_set else default_timeout, + ) diff --git a/pyproject.toml b/pyproject.toml index 6d2b30b03..fe36d623e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "Jinja2>=3.1.2", "pydantic>=2.7", "pydantic-extra-types>=2.3.0", + "pydantic-settings>=2.6.0", "PyYAML>=6.0", "requests>=2.31.0", "rich>=13.5.2,<14", diff --git a/tests/benchmark/test_runner.py b/tests/benchmark/test_runner.py index 8a5b41c1d..b020a85d0 100644 --- a/tests/benchmark/test_runner.py +++ b/tests/benchmark/test_runner.py @@ -7,7 +7,8 @@ from typing import TYPE_CHECKING -from anta.runner import prepare_tests +from anta.result_manager import ResultManager +from anta.runner import get_coroutines, prepare_tests if TYPE_CHECKING: from collections import defaultdict @@ -33,19 +34,15 @@ def _() -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None: assert sum(len(tests) for tests in selected_tests.values()) == len(inventory) * len(catalog.tests) -# ruff: noqa: ERA001 -# TODO: see what can be done with this benchmark -# def test_get_coroutines(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: -# """Benchmark `anta.runner.get_coroutines`.""" -# result = setup_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) -# assert result is not None -# count, selected_tests = result -# -# assert selected_tests is not None -# -# coroutines = benchmark(lambda: get_coroutines(selected_tests=selected_tests, manager=ResultManager())) -# for coros in coroutines: -# coros.close() -# -# count = sum(len(tests) for tests in selected_tests.values()) -# assert count == len(coroutines) +def test_get_coroutines(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventory: AntaInventory) -> None: + """Benchmark `anta.runner.get_coroutines`.""" + selected_tests = prepare_tests(inventory=inventory, catalog=catalog, tests=None, tags=None) + + assert selected_tests is not None + + coroutines = benchmark(lambda: get_coroutines(selected_tests=selected_tests, manager=ResultManager())) + for coros in coroutines: + coros.close() + + count = sum(len(tests) for tests in selected_tests.values()) + assert count == len(coroutines) From 1416da0a4c948d8da374f19cee9ffd46190336fb Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 28 Nov 2024 19:15:03 -0500 Subject: [PATCH 12/13] Added unit tests --- anta/cli/exec/utils.py | 2 +- anta/device.py | 28 ++--- anta/runner.py | 2 +- tests/units/cli/test_utils.py | 33 ++++++ tests/units/conftest.py | 11 +- tests/units/test_runner.py | 213 +++++++++++++++++++++++++++++++++- tests/units/test_settings.py | 174 +++++++++++++++++++++++++++ 7 files changed, 444 insertions(+), 19 deletions(-) create mode 100644 tests/units/cli/test_utils.py create mode 100644 tests/units/test_settings.py diff --git a/anta/cli/exec/utils.py b/anta/cli/exec/utils.py index 272871a32..33a02220c 100644 --- a/anta/cli/exec/utils.py +++ b/anta/cli/exec/utils.py @@ -154,7 +154,7 @@ async def collect(device: AntaDevice) -> None: ) logger.warning("Configuring 'aaa authorization exec default local' on device %s", device.name) command = AntaCommand(command="show running-config | include aaa authorization exec default local", ofmt="text") - await device._client.cli(commands=commands) + await device._session.cli(commands=commands) logger.info("Configured 'aaa authorization exec default local' on device %s", device.name) logger.debug("'aaa authorization exec default local' is already configured on device %s", device.name) diff --git a/anta/device.py b/anta/device.py index a4e1903eb..a09efec94 100644 --- a/anta/device.py +++ b/anta/device.py @@ -324,20 +324,20 @@ def __init__( self._enable_password = enable_password # Create the async eAPI client - self._client = self._create_asynceapi_client(host, port, username, password, proto, timeout) + self._session = self._create_asynceapi_session(host, port, username, password, proto, timeout) # Create the SSH connection options self._ssh_opts = self._create_ssh_options(host, ssh_port, username, password, insecure=insecure) - def _create_asynceapi_client( + def _create_asynceapi_session( self, host: str, port: int | None, username: str, password: str, proto: Literal["http", "https"], timeout: float | None ) -> asynceapi.Device: """Create the asynceapi client with the provided parameters.""" # Get resource limits and timeout values - client_limits = get_httpx_limits() - client_timeout = get_httpx_timeout(timeout) + session_limits = get_httpx_limits() + session_timeout = get_httpx_timeout(timeout) - return asynceapi.Device(host=host, port=port, username=username, password=password, proto=proto, timeout=client_timeout, limits=client_limits) + return asynceapi.Device(host=host, port=port, username=username, password=password, proto=proto, timeout=session_timeout, limits=session_limits) def _create_ssh_options(self, host: str, port: int, username: str, password: str, *, insecure: bool) -> SSHClientConnectionOptions: """Create the SSH connection options with the provided parameters.""" @@ -351,8 +351,8 @@ def __rich_repr__(self) -> Iterator[tuple[str, Any]]: https://rich.readthedocs.io/en/stable/pretty.html#rich-repr-protocol. """ yield from super().__rich_repr__() - yield ("host", self._client.host) - yield ("eapi_port", self._client.port) + yield ("host", self._session.host) + yield ("eapi_port", self._session.port) yield ("username", self._ssh_opts.username) yield ("enable", self.enable) yield ("insecure", self._ssh_opts.known_hosts is None) @@ -361,7 +361,7 @@ def __rich_repr__(self) -> Iterator[tuple[str, Any]]: removed_pw = "" _ssh_opts["password"] = removed_pw _ssh_opts["kwargs"]["password"] = removed_pw - yield ("_client", vars(self._client)) + yield ("_session", vars(self._session)) yield ("_ssh_opts", _ssh_opts) def __repr__(self) -> str: @@ -373,8 +373,8 @@ def __repr__(self) -> str: f"is_online={self.is_online!r}, " f"established={self.established!r}, " f"disable_cache={self.cache is None!r}, " - f"host={self._client.host!r}, " - f"eapi_port={self._client.port!r}, " + f"host={self._session.host!r}, " + f"eapi_port={self._session.port!r}, " f"username={self._ssh_opts.username!r}, " f"enable={self.enable!r}, " f"insecure={self._ssh_opts.known_hosts is None!r})" @@ -386,7 +386,7 @@ def _keys(self) -> tuple[Any, ...]: This covers the use case of port forwarding when the host is localhost and the devices have different ports. """ - return (self._client.host, self._client.port) + return (self._session.host, self._session.port) async def _collect(self, command: AntaCommand, *, collection_id: str | None = None) -> None: # noqa: C901 function is too complex - because of many required except blocks """Collect device command output from EOS using aio-eapi. @@ -415,7 +415,7 @@ async def _collect(self, command: AntaCommand, *, collection_id: str | None = No commands.append({"cmd": "enable"}) commands += [{"cmd": command.command, "revision": command.revision}] if command.revision else [{"cmd": command.command}] try: - response: list[dict[str, Any] | str] = await self._client.cli( + response: list[dict[str, Any] | str] = await self._session.cli( commands=commands, ofmt=command.ofmt, version=command.version, @@ -437,7 +437,7 @@ async def _collect(self, command: AntaCommand, *, collection_id: str | None = No except TimeoutException as e: # This block catches Timeout exceptions. command.errors = [exc_to_str(e)] - timeouts = self._client.timeout.as_dict() + timeouts = self._session.timeout.as_dict() logger.error( "%s occurred while sending a command to %s.\n" "Current timeouts: Connect: %s | Read: %s | Write: %s | Pool: %s\n" @@ -475,7 +475,7 @@ async def refresh(self) -> None: - hw_model: The hardware model of the device """ logger.debug("Refreshing device %s", self.name) - self.is_online = await self._client.check_connection() + self.is_online = await self._session.check_connection() if self.is_online: show_version = AntaCommand(command="show version") await self._collect(show_version) diff --git a/anta/runner.py b/anta/runner.py index 1f69d1eb0..8fdd98866 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -59,7 +59,7 @@ def log_run_information( System file descriptor limit. """ # TODO: 34 is a magic numbers from RichHandler formatting catering for date, level and path - width = min(int(console.width) - 34, len("Maximum number of open file descriptors for the current ANTA process: 0000000000\n")) + width = min(int(console.width) - 34, len("Devices: 000 total, 000 established\n")) devices_total, devices_established = device_count diff --git a/tests/units/cli/test_utils.py b/tests/units/cli/test_utils.py new file mode 100644 index 000000000..b4874c592 --- /dev/null +++ b/tests/units/cli/test_utils.py @@ -0,0 +1,33 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Tests for anta.cli.utils.""" + +import pytest +from click import BadParameter + +from anta.cli.utils import FloatOrNoneParamType + + +def test_float_or_none_param_type() -> None: + """Test FloatOrNoneParamType click parameter type.""" + param_type = FloatOrNoneParamType() + + # Test valid float strings + assert param_type.convert("1.23", None, None) == 1.23 + assert param_type.convert("-4.56", None, None) == -4.56 + + # Test None values + assert param_type.convert(None, None, None) is None + assert param_type.convert("none", None, None) is None + assert param_type.convert("NONE", None, None) is None + + # Test float inputs + assert param_type.convert(1.23, None, None) == 1.23 + + # Test invalid inputs + with pytest.raises(BadParameter): + param_type.convert("invalid", None, None) + + with pytest.raises(BadParameter): + param_type.convert("1.2.3", None, None) diff --git a/tests/units/conftest.py b/tests/units/conftest.py index 665075c6f..3d4fa5ded 100644 --- a/tests/units/conftest.py +++ b/tests/units/conftest.py @@ -5,8 +5,10 @@ from __future__ import annotations +import os from pathlib import Path from typing import TYPE_CHECKING, Any +from unittest import mock from unittest.mock import patch import pytest @@ -15,7 +17,7 @@ from anta.device import AntaDevice, AsyncEOSDevice if TYPE_CHECKING: - from collections.abc import Iterator + from collections.abc import Generator, Iterator from anta.models import AntaCommand @@ -83,3 +85,10 @@ def yaml_file(request: pytest.FixtureRequest, tmp_path: Path) -> Path: content: dict[str, Any] = request.param file.write_text(yaml.dump(content, allow_unicode=True)) return file + + +@pytest.fixture +def setenvvar(monkeypatch: pytest.MonkeyPatch) -> Generator[pytest.MonkeyPatch, None, None]: + """Fixture to set environment variables for testing.""" + with mock.patch.dict(os.environ, clear=True): + yield monkeypatch diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py index 8d19a4d1a..a5d8865d4 100644 --- a/tests/units/test_runner.py +++ b/tests/units/test_runner.py @@ -5,21 +5,30 @@ from __future__ import annotations +import asyncio import logging import resource import sys from pathlib import Path -from unittest.mock import patch +from typing import TYPE_CHECKING, Any +from unittest.mock import Mock, patch import pytest from anta.catalog import AntaCatalog from anta.inventory import AntaInventory from anta.result_manager import ResultManager -from anta.runner import adjust_rlimit_nofile, main, prepare_tests + +# Import as Result to avoid PytestCollectionWarning +from anta.result_manager.models import TestResult as Result +from anta.runner import adjust_rlimit_nofile, get_coroutines, log_run_information, main, prepare_tests, run from .test_models import FakeTest, FakeTestWithMissingTest +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Coroutine + from warnings import WarningMessage + DATA_DIR: Path = Path(__file__).parent.parent.resolve() / "data" FAKE_CATALOG: AntaCatalog = AntaCatalog.from_list([(FakeTest, None)]) @@ -182,3 +191,203 @@ async def test_cannot_create_test(caplog: pytest.LogCaptureFixture, inventory: A else "Can't instantiate abstract class FakeTestWithMissingTest with abstract method test" ) assert msg in caplog.messages + + +async def test_get_coroutines_deprecation(inventory: AntaInventory) -> None: + """Test that get_coroutines raises a DeprecationWarning.""" + # Create selected tests with a single test + selected_tests = prepare_tests(inventory=inventory, catalog=FAKE_CATALOG, tags=None, tests=None) + + manager = ResultManager() + + with pytest.warns(DeprecationWarning) as warning_records: + coroutines = get_coroutines(selected_tests, manager) + + # Verify the warning + assert len(warning_records) == 1 + warning: WarningMessage = warning_records[0] + assert "get_coroutines" in str(warning.message) + assert "deprecated" in str(warning.message) + assert warning.category is DeprecationWarning + + # Verify the stacklevel + assert warning.filename == __file__ + + # Verify return type + assert isinstance(coroutines, list) + assert len(coroutines) == 1 + assert hasattr(coroutines[0], "__await__") + + # Await the coroutine to avoid RuntimeWarning + await coroutines[0] + + +class EmptyGenerator: + """Helper class to create an empty async generator.""" + + def __aiter__(self) -> EmptyGenerator: + """Make this class an async iterator.""" + return self + + async def __anext__(self) -> None: + """Raise StopAsyncIteration.""" + raise StopAsyncIteration + + +async def mock_test_coro(result: Result) -> Result: + """Mock coroutine simulating a test.""" + # Simulate some work + await asyncio.sleep(0.1) + return result + + +async def create_test_generator(results: list[Result]) -> AsyncGenerator[Coroutine[Any, Any, Result], None]: + """Create a test generator yielding mock test coroutines.""" + for result in results: + yield mock_test_coro(result) + + +async def test_run_with_zero_limit() -> None: + """Test that run raises RuntimeError when limit is 0.""" + mock_result = Mock(spec=Result) + generator = create_test_generator([mock_result]) + + with pytest.raises(RuntimeError, match="Concurrency limit must be greater than 0"): + await run(generator, limit=0).__anext__() + + +async def test_run_with_negative_limit() -> None: + """Test that run raises RuntimeError when limit is negative.""" + mock_result = Mock(spec=Result) + generator = create_test_generator([mock_result]) + + with pytest.raises(RuntimeError, match="Concurrency limit must be greater than 0"): + await run(generator, limit=-1).__anext__() + + +async def test_run_with_empty_generator(caplog: pytest.LogCaptureFixture) -> None: + """Test run behavior with an empty generator.""" + caplog.set_level(logging.DEBUG) + + results = [result async for result in run(EmptyGenerator(), limit=1)] + assert len(results) == 0 + assert "All tests have been added to the pending set" in caplog.text + assert "No pending tests and all tests have been processed. Exiting" in caplog.text + + +async def test_run_with_concurrent_limit(caplog: pytest.LogCaptureFixture) -> None: + """Test run behavior with concurrent limit.""" + caplog.set_level(logging.DEBUG) + + # Create 3 mock results + results = [Mock(spec=Result) for _ in range(3)] + generator = create_test_generator(results) + + # Run with limit of 2 to test concurrency limit + completed_results = [result async for result in run(generator, limit=2)] + + # Verify all results were returned + assert len(completed_results) == 3 + + # Verify logging messages + assert "Concurrency limit reached: 2 tests running" in caplog.text + assert any("Completed" in msg and "Pending count:" in msg for msg in caplog.messages) + + +async def test_run_sequential_execution(caplog: pytest.LogCaptureFixture) -> None: + """Test run with limit=1 for sequential execution.""" + caplog.set_level(logging.DEBUG) + + # Create mock results with different values to verify order + results = [Mock(spec=Result, value=i) for i in range(3)] + generator = create_test_generator(results) + + # Run with limit of 1 to ensure sequential execution + completed_results = [result async for result in run(generator, limit=1)] + + # Verify results came back in order + assert len(completed_results) == 3 + assert all(completed_results[i].value == i for i in range(3)) + assert "Concurrency limit reached: 1 tests running" in caplog.text + + +async def test_run_immediate_stop_iteration(caplog: pytest.LogCaptureFixture) -> None: + """Test run behavior when generator raises StopIteration immediately.""" + caplog.set_level(logging.DEBUG) + + results = [result async for result in run(EmptyGenerator(), limit=1)] + assert len(results) == 0 + assert "All tests have been added to the pending set" in caplog.text + assert "No pending tests and all tests have been processed. Exiting" in caplog.text + + +def test_log_run_information_basic_logging(caplog: pytest.LogCaptureFixture) -> None: + """Test basic logging output with typical values.""" + caplog.set_level(logging.INFO) + + log_run_information(device_count=(5, 3), test_count=10, max_concurrency=20, max_connections=5, file_descriptor_limit=1024) + + assert "ANTA NRFU Run Information" in caplog.text + assert "Devices: 5 total, 3 established" in caplog.text + assert "Tests: 10 total" in caplog.text + assert "Max concurrent tests: 20" in caplog.text + assert "Max connections per device: 5" in caplog.text + assert "Max file descriptors: 1024" in caplog.text + assert len(caplog.records) == 1 + + +def test_log_run_information_unlimited_connections(caplog: pytest.LogCaptureFixture) -> None: + """Test logging when max_connections is None (unlimited).""" + caplog.set_level(logging.INFO) + + log_run_information(device_count=(3, 2), test_count=10, max_concurrency=20, max_connections=None, file_descriptor_limit=1024) + + assert "Max connections per device: Unlimited" in caplog.text + assert "Running with unlimited HTTP connections" in caplog.text + assert "file descriptor limit (1024)" in caplog.text + assert any(record.levelno == logging.WARNING for record in caplog.records) + + +def test_log_run_information_exceeding_concurrency(caplog: pytest.LogCaptureFixture) -> None: + """Test warning when test count exceeds max concurrency.""" + caplog.set_level(logging.INFO) + + log_run_information(device_count=(2, 2), test_count=30, max_concurrency=20, max_connections=5, file_descriptor_limit=1024) + + assert "Tests count (30) exceeds concurrent limit (20)" in caplog.text + assert "Tests will be throttled" in caplog.text + assert any(record.levelno == logging.WARNING for record in caplog.records) + + +def test_log_run_information_exceeding_file_descriptor_limit(caplog: pytest.LogCaptureFixture) -> None: + """Test warning when potential connections exceed file descriptor limit.""" + caplog.set_level(logging.INFO) + + log_run_information(device_count=(5, 5), test_count=10, max_concurrency=20, max_connections=300, file_descriptor_limit=1024) + + assert "Potential connections (1500) exceeds file descriptor limit (1024)" in caplog.text + assert "Connection errors may occur" in caplog.text + assert any(record.levelno == logging.WARNING for record in caplog.records) + + +def test_log_run_information_multiple_warnings(caplog: pytest.LogCaptureFixture) -> None: + """Test multiple warning conditions occurring simultaneously.""" + caplog.set_level(logging.INFO) + + log_run_information(device_count=(3, 3), test_count=50, max_concurrency=20, max_connections=400, file_descriptor_limit=1024) + + warning_records = [r for r in caplog.records if r.levelno == logging.WARNING] + assert len(warning_records) == 2 + assert "Tests count (50) exceeds concurrent limit (20)" in caplog.text + assert "Potential connections (1200) exceeds file descriptor limit (1024)" in caplog.text + + +def test_log_run_information_no_warnings(caplog: pytest.LogCaptureFixture) -> None: + """Test case where no warnings should be logged.""" + caplog.set_level(logging.INFO) + + log_run_information(device_count=(2, 2), test_count=10, max_concurrency=20, max_connections=10, file_descriptor_limit=1024) + + warning_records = [r for r in caplog.records if r.levelno == logging.WARNING] + assert len(warning_records) == 0 + assert len(caplog.records) == 1 diff --git a/tests/units/test_settings.py b/tests/units/test_settings.py new file mode 100644 index 000000000..d633cf33b --- /dev/null +++ b/tests/units/test_settings.py @@ -0,0 +1,174 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Unit tests for the anta.settings module.""" + +from __future__ import annotations + +import pytest +from httpx import Limits, Timeout +from pydantic import ValidationError + +from anta.settings import ( + HTTPX_CONNECT_TIMEOUT, + HTTPX_KEEPALIVE_EXPIRY, + HTTPX_MAX_CONNECTIONS, + HTTPX_MAX_KEEPALIVE_CONNECTIONS, + HTTPX_POOL_TIMEOUT, + HTTPX_READ_TIMEOUT, + HTTPX_WRITE_TIMEOUT, + MAX_CONCURRENCY, + HttpxResourceLimitsSettings, + HttpxTimeoutsSettings, + MaxConcurrencySettings, + get_httpx_limits, + get_httpx_timeout, + get_max_concurrency, +) + + +class TestMaxConcurrency: + """Tests for the MaxConcurrencySettings class.""" + + def test_default(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test default max concurrency value.""" + settings = MaxConcurrencySettings() + assert settings.max_concurrency == MAX_CONCURRENCY + + def test_env_var(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test setting max concurrency via environment variable.""" + setenvvar.setenv("ANTA_MAX_CONCURRENCY", "500") + assert get_max_concurrency() == 500 + + def test_validation(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test validation of max concurrency value.""" + setenvvar.setenv("ANTA_MAX_CONCURRENCY", "-1") + with pytest.raises(ValidationError): + MaxConcurrencySettings() + + setenvvar.setenv("ANTA_MAX_CONCURRENCY", "0") + with pytest.raises(ValidationError): + MaxConcurrencySettings() + + setenvvar.setenv("ANTA_MAX_CONCURRENCY", "Unlimited") + with pytest.raises(ValidationError): + MaxConcurrencySettings() + + +class TestHttpxLimits: + """Tests for the HttpxResourceLimitsSettings class.""" + + def test_default(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test default HTTPX limits.""" + settings = HttpxResourceLimitsSettings() + assert settings.max_connections == HTTPX_MAX_CONNECTIONS + assert settings.max_keepalive_connections == HTTPX_MAX_KEEPALIVE_CONNECTIONS + assert settings.keepalive_expiry == HTTPX_KEEPALIVE_EXPIRY + + def test_env_vars(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test setting HTTPX limits via environment variables.""" + setenvvar.setenv("ANTA_MAX_CONNECTIONS", "200") + setenvvar.setenv("ANTA_MAX_KEEPALIVE_CONNECTIONS", "40") + setenvvar.setenv("ANTA_KEEPALIVE_EXPIRY", "10.0") + + limits = get_httpx_limits() + assert isinstance(limits, Limits) + assert limits.max_connections == 200 + assert limits.max_keepalive_connections == 40 + assert limits.keepalive_expiry == 10.0 + + def test_none_values(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test setting HTTPX limits to None via environment variables.""" + setenvvar.setenv("ANTA_MAX_CONNECTIONS", "None") + setenvvar.setenv("ANTA_MAX_KEEPALIVE_CONNECTIONS", "None") + setenvvar.setenv("ANTA_KEEPALIVE_EXPIRY", "None") + + limits = get_httpx_limits() + assert limits.max_connections is None + assert limits.max_keepalive_connections is None + assert limits.keepalive_expiry is None + + def test_mixed_values(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test mixing None and numeric values.""" + setenvvar.setenv("ANTA_MAX_CONNECTIONS", "None") + setenvvar.setenv("ANTA_MAX_KEEPALIVE_CONNECTIONS", "50") + setenvvar.setenv("ANTA_KEEPALIVE_EXPIRY", "15.0") + + limits = get_httpx_limits() + assert limits.max_connections is None + assert limits.max_keepalive_connections == 50 + assert limits.keepalive_expiry == 15.0 + + +class TestHttpxTimeouts: + """Tests for the HttpxTimeoutsSettings class.""" + + def test_default(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test default HTTPX timeout values.""" + settings = HttpxTimeoutsSettings() + assert settings.connect_timeout == HTTPX_CONNECT_TIMEOUT + assert settings.read_timeout == HTTPX_READ_TIMEOUT + assert settings.write_timeout == HTTPX_WRITE_TIMEOUT + assert settings.pool_timeout == HTTPX_POOL_TIMEOUT + + def test_env_vars(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test setting HTTPX timeouts via environment variables.""" + setenvvar.setenv("ANTA_CONNECT_TIMEOUT", "10.0") + setenvvar.setenv("ANTA_READ_TIMEOUT", "15.0") + setenvvar.setenv("ANTA_WRITE_TIMEOUT", "20.0") + setenvvar.setenv("ANTA_POOL_TIMEOUT", "25.0") + + timeout = get_httpx_timeout(default_timeout=30.0) + assert isinstance(timeout, Timeout) + assert timeout.connect == 10.0 + assert timeout.read == 15.0 + assert timeout.write == 20.0 + assert timeout.pool == 25.0 + + def test_none_values(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test setting HTTPX timeouts to None (no timeout) via environment variables.""" + setenvvar.setenv("ANTA_CONNECT_TIMEOUT", "None") + setenvvar.setenv("ANTA_READ_TIMEOUT", "None") + setenvvar.setenv("ANTA_WRITE_TIMEOUT", "None") + setenvvar.setenv("ANTA_POOL_TIMEOUT", "None") + + timeout = get_httpx_timeout(default_timeout=30.0) + assert timeout.connect is None + assert timeout.read is None + assert timeout.write is None + assert timeout.pool is None + + def test_property_flags(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test the timeout property flags that indicate if values were set by environment variables.""" + settings = HttpxTimeoutsSettings() + assert not settings.connect_set + assert not settings.read_set + assert not settings.write_set + assert not settings.pool_set + + setenvvar.setenv("ANTA_CONNECT_TIMEOUT", "None") + setenvvar.setenv("ANTA_READ_TIMEOUT", "15.0") + settings = HttpxTimeoutsSettings() + assert settings.connect_set + assert settings.read_set + assert not settings.write_set + assert not settings.pool_set + + def test_timeout_with_default_none(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test behavior when default_timeout is None.""" + timeout = get_httpx_timeout(default_timeout=None) + assert timeout.connect is None + assert timeout.read is None + assert timeout.write is None + assert timeout.pool is None + + def test_mixed_timeouts(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test mixing different timeout configurations.""" + setenvvar.setenv("ANTA_CONNECT_TIMEOUT", "5.0") + setenvvar.setenv("ANTA_READ_TIMEOUT", "None") + + timeout = get_httpx_timeout(default_timeout=10.0) + assert timeout.connect == 5.0 + assert timeout.read is None + assert timeout.write == 10.0 + assert timeout.pool == 10.0 From 78a7900a1833e1fffccccd7166f0e849afb12640 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 28 Nov 2024 19:57:37 -0500 Subject: [PATCH 13/13] Fix mypy --- anta/settings.py | 16 +++---- tests/units/test_device.py | 10 ++--- tests/units/test_runner.py | 30 +++---------- tests/units/test_settings.py | 84 ++++++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 36 deletions(-) diff --git a/anta/settings.py b/anta/settings.py index 936fa9aa8..091ce8ee8 100644 --- a/anta/settings.py +++ b/anta/settings.py @@ -6,7 +6,7 @@ from __future__ import annotations from httpx import Limits, Timeout -from pydantic import Field, PositiveInt +from pydantic import Field, NonNegativeFloat, NonNegativeInt, PositiveInt from pydantic_settings import BaseSettings, SettingsConfigDict # Default values for HTTPX resource limits @@ -53,9 +53,9 @@ class HttpxResourceLimitsSettings(BaseSettings): # The 'None' string is used to allow the environment variable to be set to `None`. model_config = SettingsConfigDict(env_parse_none_str="None", env_prefix="ANTA_") - max_connections: int | None = Field(default=HTTPX_MAX_CONNECTIONS) - max_keepalive_connections: int | None = Field(default=HTTPX_MAX_KEEPALIVE_CONNECTIONS) - keepalive_expiry: float | None = Field(default=HTTPX_KEEPALIVE_EXPIRY) + max_connections: NonNegativeInt | None = Field(default=HTTPX_MAX_CONNECTIONS) + max_keepalive_connections: NonNegativeInt | None = Field(default=HTTPX_MAX_KEEPALIVE_CONNECTIONS) + keepalive_expiry: NonNegativeFloat | None = Field(default=HTTPX_KEEPALIVE_EXPIRY) class HttpxTimeoutsSettings(BaseSettings): @@ -76,10 +76,10 @@ class HttpxTimeoutsSettings(BaseSettings): # The 'None' string is used to allow the environment variable to be set to `None`. model_config = SettingsConfigDict(env_parse_none_str="None", env_prefix="ANTA_") - connect_timeout: float | None = Field(default=HTTPX_CONNECT_TIMEOUT) - read_timeout: float | None = Field(default=HTTPX_READ_TIMEOUT) - write_timeout: float | None = Field(default=HTTPX_WRITE_TIMEOUT) - pool_timeout: float | None = Field(default=HTTPX_POOL_TIMEOUT) + connect_timeout: NonNegativeFloat | None = Field(default=HTTPX_CONNECT_TIMEOUT) + read_timeout: NonNegativeFloat | None = Field(default=HTTPX_READ_TIMEOUT) + write_timeout: NonNegativeFloat | None = Field(default=HTTPX_WRITE_TIMEOUT) + pool_timeout: NonNegativeFloat | None = Field(default=HTTPX_POOL_TIMEOUT) # The following properties are used to determine if a specific timeout was set by an environment variable @property diff --git a/tests/units/test_device.py b/tests/units/test_device.py index cdaba34ea..faf614481 100644 --- a/tests/units/test_device.py +++ b/tests/units/test_device.py @@ -565,11 +565,11 @@ def test__eq(self, device1: dict[str, Any], device2: dict[str, Any], expected: b ) async def test_refresh(self, async_device: AsyncEOSDevice, patch_kwargs: list[dict[str, Any]], expected: dict[str, Any]) -> None: """Test AsyncEOSDevice.refresh().""" - with patch.object(async_device._client, "check_connection", **patch_kwargs[0]), patch.object(async_device._client, "cli", **patch_kwargs[1]): + with patch.object(async_device._session, "check_connection", **patch_kwargs[0]), patch.object(async_device._session, "cli", **patch_kwargs[1]): await async_device.refresh() - async_device._client.check_connection.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.check_connection is patched + async_device._session.check_connection.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.check_connection is patched if expected["is_online"]: - async_device._client.cli.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.cli is patched + async_device._session.cli.assert_called_once() # type: ignore[attr-defined] # asynceapi.Device.cli is patched assert async_device.is_online == expected["is_online"] assert async_device.established == expected["established"] assert async_device.hw_model == expected["hw_model"] @@ -582,7 +582,7 @@ async def test_refresh(self, async_device: AsyncEOSDevice, patch_kwargs: list[di async def test__collect(self, async_device: AsyncEOSDevice, command: dict[str, Any], expected: dict[str, Any]) -> None: """Test AsyncEOSDevice._collect().""" cmd = AntaCommand(command=command["command"], revision=command["revision"]) if "revision" in command else AntaCommand(command=command["command"]) - with patch.object(async_device._client, "cli", **command["patch_kwargs"]): + with patch.object(async_device._session, "cli", **command["patch_kwargs"]): collection_id = "pytest" await async_device.collect(cmd, collection_id=collection_id) commands: list[dict[str, Any]] = [] @@ -600,7 +600,7 @@ async def test__collect(self, async_device: AsyncEOSDevice, command: dict[str, A commands.append({"cmd": cmd.command, "revision": cmd.revision}) else: commands.append({"cmd": cmd.command}) - async_device._client.cli.assert_called_once_with(commands=commands, ofmt=cmd.ofmt, version=cmd.version, req_id=f"ANTA-{collection_id}-{id(cmd)}") # type: ignore[attr-defined] # asynceapi.Device.cli is patched + async_device._session.cli.assert_called_once_with(commands=commands, ofmt=cmd.ofmt, version=cmd.version, req_id=f"ANTA-{collection_id}-{id(cmd)}") # type: ignore[attr-defined] # asynceapi.Device.cli is patched assert cmd.output == expected["output"] assert cmd.errors == expected["errors"] diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py index a5d8865d4..4ed3de22b 100644 --- a/tests/units/test_runner.py +++ b/tests/units/test_runner.py @@ -26,7 +26,7 @@ from .test_models import FakeTest, FakeTestWithMissingTest if TYPE_CHECKING: - from collections.abc import AsyncGenerator, Coroutine + from collections.abc import AsyncGenerator, AsyncIterator, Coroutine, Sequence from warnings import WarningMessage DATA_DIR: Path = Path(__file__).parent.parent.resolve() / "data" @@ -201,6 +201,7 @@ async def test_get_coroutines_deprecation(inventory: AntaInventory) -> None: manager = ResultManager() with pytest.warns(DeprecationWarning) as warning_records: + assert selected_tests is not None coroutines = get_coroutines(selected_tests, manager) # Verify the warning @@ -225,11 +226,11 @@ async def test_get_coroutines_deprecation(inventory: AntaInventory) -> None: class EmptyGenerator: """Helper class to create an empty async generator.""" - def __aiter__(self) -> EmptyGenerator: + def __aiter__(self) -> AsyncIterator[Coroutine[Any, Any, Result]]: """Make this class an async iterator.""" return self - async def __anext__(self) -> None: + async def __anext__(self) -> Coroutine[Any, Any, Result]: """Raise StopAsyncIteration.""" raise StopAsyncIteration @@ -241,7 +242,7 @@ async def mock_test_coro(result: Result) -> Result: return result -async def create_test_generator(results: list[Result]) -> AsyncGenerator[Coroutine[Any, Any, Result], None]: +async def create_test_generator(results: Sequence[Result]) -> AsyncGenerator[Coroutine[Any, Any, Result], None]: """Create a test generator yielding mock test coroutines.""" for result in results: yield mock_test_coro(result) @@ -269,7 +270,7 @@ async def test_run_with_empty_generator(caplog: pytest.LogCaptureFixture) -> Non """Test run behavior with an empty generator.""" caplog.set_level(logging.DEBUG) - results = [result async for result in run(EmptyGenerator(), limit=1)] + results = [result async for result in run(EmptyGenerator(), limit=1)] # type: ignore[arg-type] assert len(results) == 0 assert "All tests have been added to the pending set" in caplog.text assert "No pending tests and all tests have been processed. Exiting" in caplog.text @@ -294,28 +295,11 @@ async def test_run_with_concurrent_limit(caplog: pytest.LogCaptureFixture) -> No assert any("Completed" in msg and "Pending count:" in msg for msg in caplog.messages) -async def test_run_sequential_execution(caplog: pytest.LogCaptureFixture) -> None: - """Test run with limit=1 for sequential execution.""" - caplog.set_level(logging.DEBUG) - - # Create mock results with different values to verify order - results = [Mock(spec=Result, value=i) for i in range(3)] - generator = create_test_generator(results) - - # Run with limit of 1 to ensure sequential execution - completed_results = [result async for result in run(generator, limit=1)] - - # Verify results came back in order - assert len(completed_results) == 3 - assert all(completed_results[i].value == i for i in range(3)) - assert "Concurrency limit reached: 1 tests running" in caplog.text - - async def test_run_immediate_stop_iteration(caplog: pytest.LogCaptureFixture) -> None: """Test run behavior when generator raises StopIteration immediately.""" caplog.set_level(logging.DEBUG) - results = [result async for result in run(EmptyGenerator(), limit=1)] + results = [result async for result in run(EmptyGenerator(), limit=1)] # type: ignore[arg-type] assert len(results) == 0 assert "All tests have been added to the pending set" in caplog.text assert "No pending tests and all tests have been processed. Exiting" in caplog.text diff --git a/tests/units/test_settings.py b/tests/units/test_settings.py index d633cf33b..f919d3556 100644 --- a/tests/units/test_settings.py +++ b/tests/units/test_settings.py @@ -99,6 +99,43 @@ def test_mixed_values(self, setenvvar: pytest.MonkeyPatch) -> None: assert limits.max_keepalive_connections == 50 assert limits.keepalive_expiry == 15.0 + def test_httpx_limits_validation(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test validation of HTTPX resource limits values.""" + # Test negative values + setenvvar.setenv("ANTA_MAX_CONNECTIONS", "-1") + with pytest.raises(ValidationError): + HttpxResourceLimitsSettings() + + setenvvar.setenv("ANTA_MAX_KEEPALIVE_CONNECTIONS", "-5") + with pytest.raises(ValidationError): + HttpxResourceLimitsSettings() + + setenvvar.setenv("ANTA_KEEPALIVE_EXPIRY", "-2.5") + with pytest.raises(ValidationError): + HttpxResourceLimitsSettings() + + # Test invalid string values + setenvvar.setenv("ANTA_MAX_CONNECTIONS", "unlimited") + with pytest.raises(ValidationError): + HttpxResourceLimitsSettings() + + setenvvar.setenv("ANTA_MAX_KEEPALIVE_CONNECTIONS", "infinity") + with pytest.raises(ValidationError): + HttpxResourceLimitsSettings() + + setenvvar.setenv("ANTA_KEEPALIVE_EXPIRY", "forever") + with pytest.raises(ValidationError): + HttpxResourceLimitsSettings() + + # Test zero values (should be valid for NonNegative types) + setenvvar.setenv("ANTA_MAX_CONNECTIONS", "0") + setenvvar.setenv("ANTA_MAX_KEEPALIVE_CONNECTIONS", "0") + setenvvar.setenv("ANTA_KEEPALIVE_EXPIRY", "0.0") + settings = HttpxResourceLimitsSettings() + assert settings.max_connections == 0 + assert settings.max_keepalive_connections == 0 + assert settings.keepalive_expiry == 0.0 + class TestHttpxTimeouts: """Tests for the HttpxTimeoutsSettings class.""" @@ -172,3 +209,50 @@ def test_mixed_timeouts(self, setenvvar: pytest.MonkeyPatch) -> None: assert timeout.read is None assert timeout.write == 10.0 assert timeout.pool == 10.0 + + def test_httpx_timeouts_validation(self, setenvvar: pytest.MonkeyPatch) -> None: + """Test validation of HTTPX timeout values.""" + # Test negative values + setenvvar.setenv("ANTA_CONNECT_TIMEOUT", "-1.0") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + setenvvar.setenv("ANTA_READ_TIMEOUT", "-5.0") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + setenvvar.setenv("ANTA_WRITE_TIMEOUT", "-2.5") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + setenvvar.setenv("ANTA_POOL_TIMEOUT", "-3.0") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + # Test invalid string values + setenvvar.setenv("ANTA_CONNECT_TIMEOUT", "instant") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + setenvvar.setenv("ANTA_READ_TIMEOUT", "forever") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + setenvvar.setenv("ANTA_WRITE_TIMEOUT", "unlimited") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + setenvvar.setenv("ANTA_POOL_TIMEOUT", "infinite") + with pytest.raises(ValidationError): + HttpxTimeoutsSettings() + + # Test zero values (should be valid for NonNegative types) + setenvvar.setenv("ANTA_CONNECT_TIMEOUT", "0.0") + setenvvar.setenv("ANTA_READ_TIMEOUT", "0.0") + setenvvar.setenv("ANTA_WRITE_TIMEOUT", "0.0") + setenvvar.setenv("ANTA_POOL_TIMEOUT", "0.0") + settings = HttpxTimeoutsSettings() + assert settings.connect_timeout == 0.0 + assert settings.read_timeout == 0.0 + assert settings.write_timeout == 0.0 + assert settings.pool_timeout == 0.0