From f702c917ed4820a4cf34c3b7ba86944458b530f1 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 19 Dec 2024 05:49:54 -0500 Subject: [PATCH] fix(anta): Fix ResultManager statistics (#962) --- anta/result_manager/__init__.py | 79 +++++++++++++--- anta/runner.py | 13 ++- tests/units/cli/nrfu/test_commands.py | 2 +- tests/units/result_manager/test__init__.py | 101 +++++++++++++++++++++ 4 files changed, 174 insertions(+), 21 deletions(-) diff --git a/anta/result_manager/__init__.py b/anta/result_manager/__init__.py index ef0c4dfcb..b5b0f393f 100644 --- a/anta/result_manager/__init__.py +++ b/anta/result_manager/__init__.py @@ -6,6 +6,7 @@ from __future__ import annotations import json +import logging from collections import defaultdict from functools import cached_property from itertools import chain @@ -15,7 +16,10 @@ from .models import CategoryStats, DeviceStats, TestStats +logger = logging.getLogger(__name__) + +# pylint: disable=too-many-instance-attributes class ResultManager: """Helper to manage Test Results and generate reports. @@ -69,6 +73,15 @@ class ResultManager: ] """ + _result_entries: list[TestResult] + status: AntaTestStatus + error_status: bool + + _device_stats: defaultdict[str, DeviceStats] + _category_stats: defaultdict[str, CategoryStats] + _test_stats: defaultdict[str, TestStats] + _stats_in_sync: bool + def __init__(self) -> None: """Class constructor. @@ -98,9 +111,8 @@ def reset(self) -> None: self.status: AntaTestStatus = AntaTestStatus.UNSET self.error_status = False - self.device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats) - self.category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats) - self.test_stats: defaultdict[str, TestStats] = defaultdict(TestStats) + # Initialize the statistics attributes + self._reset_stats() def __len__(self) -> int: """Implement __len__ method to count number of results.""" @@ -115,14 +127,7 @@ def results(self) -> list[TestResult]: def results(self, value: list[TestResult]) -> None: """Set the list of TestResult.""" # When setting the results, we need to reset the state of the current instance - self._result_entries = [] - self.status = AntaTestStatus.UNSET - self.error_status = False - - # Also reset the stats attributes - self.device_stats = defaultdict(DeviceStats) - self.category_stats = defaultdict(CategoryStats) - self.test_stats = defaultdict(TestStats) + self.reset() for result in value: self.add(result) @@ -137,9 +142,28 @@ def json(self) -> str: """Get a JSON representation of the results.""" return json.dumps(self.dump, indent=4) + @property + def device_stats(self) -> defaultdict[str, DeviceStats]: + """Get the device statistics.""" + self._ensure_stats_in_sync() + return self._device_stats + + @property + def category_stats(self) -> defaultdict[str, CategoryStats]: + """Get the category statistics.""" + self._ensure_stats_in_sync() + return self._category_stats + + @property + def test_stats(self) -> defaultdict[str, TestStats]: + """Get the test statistics.""" + self._ensure_stats_in_sync() + return self._test_stats + @property def sorted_category_stats(self) -> dict[str, CategoryStats]: """A property that returns the category_stats dictionary sorted by key name.""" + self._ensure_stats_in_sync() return dict(sorted(self.category_stats.items())) @cached_property @@ -163,6 +187,13 @@ def _update_status(self, test_status: AntaTestStatus) -> None: elif self.status == "success" and test_status == "failure": self.status = AntaTestStatus.FAILURE + def _reset_stats(self) -> None: + """Create or reset the statistics attributes.""" + self._device_stats = defaultdict(DeviceStats) + self._category_stats = defaultdict(CategoryStats) + self._test_stats = defaultdict(TestStats) + self._stats_in_sync = False + def _update_stats(self, result: TestResult) -> None: """Update the statistics based on the test result. @@ -174,7 +205,7 @@ def _update_stats(self, result: TestResult) -> None: count_attr = f"tests_{result.result}_count" # Update device stats - device_stats: DeviceStats = self.device_stats[result.name] + device_stats: DeviceStats = self._device_stats[result.name] setattr(device_stats, count_attr, getattr(device_stats, count_attr) + 1) if result.result in ("failure", "error"): device_stats.tests_failure.add(result.test) @@ -184,16 +215,34 @@ def _update_stats(self, result: TestResult) -> None: # Update category stats for category in result.categories: - category_stats: CategoryStats = self.category_stats[category] + category_stats: CategoryStats = self._category_stats[category] setattr(category_stats, count_attr, getattr(category_stats, count_attr) + 1) # Update test stats count_attr = f"devices_{result.result}_count" - test_stats: TestStats = self.test_stats[result.test] + test_stats: TestStats = self._test_stats[result.test] setattr(test_stats, count_attr, getattr(test_stats, count_attr) + 1) if result.result in ("failure", "error"): test_stats.devices_failure.add(result.name) + def _compute_stats(self) -> None: + """Compute all statistics from the current results.""" + logger.info("Computing statistics for all results.") + + # Reset all stats + self._reset_stats() + + # Recompute stats for all results + for result in self._result_entries: + self._update_stats(result) + + self._stats_in_sync = True + + def _ensure_stats_in_sync(self) -> None: + """Ensure statistics are in sync with current results.""" + if not self._stats_in_sync: + self._compute_stats() + def add(self, result: TestResult) -> None: """Add a result to the ResultManager instance. @@ -207,7 +256,7 @@ def add(self, result: TestResult) -> None: """ self._result_entries.append(result) self._update_status(result.result) - self._update_stats(result) + self._stats_in_sync = False # Every time a new result is added, we need to clear the cached property self.__dict__.pop("results_by_status", None) diff --git a/anta/runner.py b/anta/runner.py index 384aceb5f..4c6da928a 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -179,7 +179,7 @@ def prepare_tests( return device_to_tests -def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager) -> list[Coroutine[Any, Any, TestResult]]: +def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager | None = None) -> list[Coroutine[Any, Any, TestResult]]: """Get the coroutines for the ANTA run. Parameters @@ -187,7 +187,7 @@ def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinitio selected_tests A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function. manager - A ResultManager + An optional ResultManager object to pre-populate with the test results. Used in dry-run mode. Returns ------- @@ -199,7 +199,8 @@ def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinitio for test in test_definitions: try: test_instance = test.test(device=device, inputs=test.inputs) - manager.add(test_instance.result) + if manager is not None: + manager.add(test_instance.result) coros.append(test_instance.test()) except Exception as e: # noqa: PERF203, BLE001 # An AntaTest instance is potentially user-defined code. @@ -292,7 +293,7 @@ async def main( "Please consult the ANTA FAQ." ) - coroutines = get_coroutines(selected_tests, manager) + coroutines = get_coroutines(selected_tests, manager if dry_run else None) if dry_run: logger.info("Dry-run mode, exiting before running the tests.") @@ -304,6 +305,8 @@ async def main( AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coroutines)) with Catchtime(logger=logger, message="Running ANTA tests"): - await asyncio.gather(*coroutines) + results = await asyncio.gather(*coroutines) + for result in results: + manager.add(result) log_cache_statistics(selected_inventory.devices) diff --git a/tests/units/cli/nrfu/test_commands.py b/tests/units/cli/nrfu/test_commands.py index 0d8046c01..372c86a8f 100644 --- a/tests/units/cli/nrfu/test_commands.py +++ b/tests/units/cli/nrfu/test_commands.py @@ -79,7 +79,7 @@ def test_anta_nrfu_text(click_runner: CliRunner) -> None: def test_anta_nrfu_text_multiple_failures(click_runner: CliRunner) -> None: """Test anta nrfu text with multiple failures, catalog is given via env.""" result = click_runner.invoke(anta, ["nrfu", "text"], env={"ANTA_CATALOG": str(DATA_DIR / "test_catalog_double_failure.yml")}) - assert result.exit_code == ExitCode.OK + assert result.exit_code == ExitCode.TESTS_FAILED assert ( """spine1 :: VerifyInterfacesSpeed :: FAILURE Interface `Ethernet2` is not found. diff --git a/tests/units/result_manager/test__init__.py b/tests/units/result_manager/test__init__.py index 1fd51cbe7..e41a436f6 100644 --- a/tests/units/result_manager/test__init__.py +++ b/tests/units/result_manager/test__init__.py @@ -6,6 +6,7 @@ from __future__ import annotations import json +import logging import re from contextlib import AbstractContextManager, nullcontext from typing import TYPE_CHECKING, Callable @@ -379,3 +380,103 @@ def test_get_devices(self, test_result_factory: Callable[[], TestResult], list_r assert len(result_manager.get_devices()) == 2 assert all(t in result_manager.get_devices() for t in ["Device1", "Device2"]) + + def test_stats_computation_methods(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None: + """Test ResultManager internal stats computation methods.""" + result_manager = ResultManager() + + # Initially stats should be unsynced + assert result_manager._stats_in_sync is False + + # Test _reset_stats + result_manager._reset_stats() + assert result_manager._stats_in_sync is False + assert len(result_manager._device_stats) == 0 + assert len(result_manager._category_stats) == 0 + assert len(result_manager._test_stats) == 0 + + # Add some test results + test1 = test_result_factory() + test1.name = "device1" + test1.result = AntaTestStatus.SUCCESS + test1.categories = ["system"] + test1.test = "test1" + + test2 = test_result_factory() + test2.name = "device2" + test2.result = AntaTestStatus.FAILURE + test2.categories = ["interfaces"] + test2.test = "test2" + + result_manager.add(test1) + result_manager.add(test2) + + # Stats should still be unsynced after adding results + assert result_manager._stats_in_sync is False + + # Test _compute_stats directly + with caplog.at_level(logging.INFO): + result_manager._compute_stats() + assert "Computing statistics for all results" in caplog.text + assert result_manager._stats_in_sync is True + + # Verify stats content + assert len(result_manager._device_stats) == 2 + assert len(result_manager._category_stats) == 2 + assert len(result_manager._test_stats) == 2 + assert result_manager._device_stats["device1"].tests_success_count == 1 + assert result_manager._device_stats["device2"].tests_failure_count == 1 + assert result_manager._category_stats["system"].tests_success_count == 1 + assert result_manager._category_stats["interfaces"].tests_failure_count == 1 + assert result_manager._test_stats["test1"].devices_success_count == 1 + assert result_manager._test_stats["test2"].devices_failure_count == 1 + + def test_stats_property_computation(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None: + """Test that stats are computed only once when accessed via properties.""" + result_manager = ResultManager() + + # Add some test results + test1 = test_result_factory() + test1.name = "device1" + test1.result = AntaTestStatus.SUCCESS + test1.categories = ["system"] + result_manager.add(test1) + + test2 = test_result_factory() + test2.name = "device2" + test2.result = AntaTestStatus.FAILURE + test2.categories = ["interfaces"] + result_manager.add(test2) + + # Stats should be unsynced after adding results + assert result_manager._stats_in_sync is False + assert "Computing statistics" not in caplog.text + + # Access device_stats property - should trigger computation + with caplog.at_level(logging.INFO): + _ = result_manager.device_stats + assert "Computing statistics for all results" in caplog.text + assert result_manager._stats_in_sync is True + + # Clear the log + caplog.clear() + + # Access other stats properties - should not trigger computation again + with caplog.at_level(logging.INFO): + _ = result_manager.category_stats + _ = result_manager.test_stats + _ = result_manager.sorted_category_stats + assert "Computing statistics" not in caplog.text + + # Add another result - should mark stats as unsynced + test3 = test_result_factory() + test3.name = "device3" + test3.result = "error" + result_manager.add(test3) + assert result_manager._stats_in_sync is False + + # Access stats again - should trigger recomputation + with caplog.at_level(logging.INFO): + _ = result_manager.device_stats + assert "Computing statistics for all results" in caplog.text + assert result_manager._stats_in_sync is True