Skip to content

Commit

Permalink
fix(anta): Fix ResultManager statistics (#962)
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon authored Dec 19, 2024
1 parent 6ee2c08 commit f702c91
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 21 deletions.
79 changes: 64 additions & 15 deletions anta/result_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import json
import logging
from collections import defaultdict
from functools import cached_property
from itertools import chain
Expand All @@ -15,7 +16,10 @@

from .models import CategoryStats, DeviceStats, TestStats

logger = logging.getLogger(__name__)


# pylint: disable=too-many-instance-attributes
class ResultManager:
"""Helper to manage Test Results and generate reports.
Expand Down Expand Up @@ -69,6 +73,15 @@ class ResultManager:
]
"""

_result_entries: list[TestResult]
status: AntaTestStatus
error_status: bool

_device_stats: defaultdict[str, DeviceStats]
_category_stats: defaultdict[str, CategoryStats]
_test_stats: defaultdict[str, TestStats]
_stats_in_sync: bool

def __init__(self) -> None:
"""Class constructor.
Expand Down Expand Up @@ -98,9 +111,8 @@ def reset(self) -> None:
self.status: AntaTestStatus = AntaTestStatus.UNSET
self.error_status = False

self.device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats)
self.category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats)
self.test_stats: defaultdict[str, TestStats] = defaultdict(TestStats)
# Initialize the statistics attributes
self._reset_stats()

def __len__(self) -> int:
"""Implement __len__ method to count number of results."""
Expand All @@ -115,14 +127,7 @@ def results(self) -> list[TestResult]:
def results(self, value: list[TestResult]) -> None:
"""Set the list of TestResult."""
# When setting the results, we need to reset the state of the current instance
self._result_entries = []
self.status = AntaTestStatus.UNSET
self.error_status = False

# Also reset the stats attributes
self.device_stats = defaultdict(DeviceStats)
self.category_stats = defaultdict(CategoryStats)
self.test_stats = defaultdict(TestStats)
self.reset()

for result in value:
self.add(result)
Expand All @@ -137,9 +142,28 @@ def json(self) -> str:
"""Get a JSON representation of the results."""
return json.dumps(self.dump, indent=4)

@property
def device_stats(self) -> defaultdict[str, DeviceStats]:
"""Get the device statistics."""
self._ensure_stats_in_sync()
return self._device_stats

@property
def category_stats(self) -> defaultdict[str, CategoryStats]:
"""Get the category statistics."""
self._ensure_stats_in_sync()
return self._category_stats

@property
def test_stats(self) -> defaultdict[str, TestStats]:
"""Get the test statistics."""
self._ensure_stats_in_sync()
return self._test_stats

@property
def sorted_category_stats(self) -> dict[str, CategoryStats]:
"""A property that returns the category_stats dictionary sorted by key name."""
self._ensure_stats_in_sync()
return dict(sorted(self.category_stats.items()))

@cached_property
Expand All @@ -163,6 +187,13 @@ def _update_status(self, test_status: AntaTestStatus) -> None:
elif self.status == "success" and test_status == "failure":
self.status = AntaTestStatus.FAILURE

def _reset_stats(self) -> None:
"""Create or reset the statistics attributes."""
self._device_stats = defaultdict(DeviceStats)
self._category_stats = defaultdict(CategoryStats)
self._test_stats = defaultdict(TestStats)
self._stats_in_sync = False

def _update_stats(self, result: TestResult) -> None:
"""Update the statistics based on the test result.
Expand All @@ -174,7 +205,7 @@ def _update_stats(self, result: TestResult) -> None:
count_attr = f"tests_{result.result}_count"

# Update device stats
device_stats: DeviceStats = self.device_stats[result.name]
device_stats: DeviceStats = self._device_stats[result.name]
setattr(device_stats, count_attr, getattr(device_stats, count_attr) + 1)
if result.result in ("failure", "error"):
device_stats.tests_failure.add(result.test)
Expand All @@ -184,16 +215,34 @@ def _update_stats(self, result: TestResult) -> None:

# Update category stats
for category in result.categories:
category_stats: CategoryStats = self.category_stats[category]
category_stats: CategoryStats = self._category_stats[category]
setattr(category_stats, count_attr, getattr(category_stats, count_attr) + 1)

# Update test stats
count_attr = f"devices_{result.result}_count"
test_stats: TestStats = self.test_stats[result.test]
test_stats: TestStats = self._test_stats[result.test]
setattr(test_stats, count_attr, getattr(test_stats, count_attr) + 1)
if result.result in ("failure", "error"):
test_stats.devices_failure.add(result.name)

def _compute_stats(self) -> None:
"""Compute all statistics from the current results."""
logger.info("Computing statistics for all results.")

# Reset all stats
self._reset_stats()

# Recompute stats for all results
for result in self._result_entries:
self._update_stats(result)

self._stats_in_sync = True

def _ensure_stats_in_sync(self) -> None:
"""Ensure statistics are in sync with current results."""
if not self._stats_in_sync:
self._compute_stats()

def add(self, result: TestResult) -> None:
"""Add a result to the ResultManager instance.
Expand All @@ -207,7 +256,7 @@ def add(self, result: TestResult) -> None:
"""
self._result_entries.append(result)
self._update_status(result.result)
self._update_stats(result)
self._stats_in_sync = False

# Every time a new result is added, we need to clear the cached property
self.__dict__.pop("results_by_status", None)
Expand Down
13 changes: 8 additions & 5 deletions anta/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ def prepare_tests(
return device_to_tests


def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager) -> list[Coroutine[Any, Any, TestResult]]:
def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager | None = None) -> list[Coroutine[Any, Any, TestResult]]:
"""Get the coroutines for the ANTA run.
Parameters
----------
selected_tests
A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function.
manager
A ResultManager
An optional ResultManager object to pre-populate with the test results. Used in dry-run mode.
Returns
-------
Expand All @@ -199,7 +199,8 @@ def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinitio
for test in test_definitions:
try:
test_instance = test.test(device=device, inputs=test.inputs)
manager.add(test_instance.result)
if manager is not None:
manager.add(test_instance.result)
coros.append(test_instance.test())
except Exception as e: # noqa: PERF203, BLE001
# An AntaTest instance is potentially user-defined code.
Expand Down Expand Up @@ -292,7 +293,7 @@ async def main(
"Please consult the ANTA FAQ."
)

coroutines = get_coroutines(selected_tests, manager)
coroutines = get_coroutines(selected_tests, manager if dry_run else None)

if dry_run:
logger.info("Dry-run mode, exiting before running the tests.")
Expand All @@ -304,6 +305,8 @@ async def main(
AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coroutines))

with Catchtime(logger=logger, message="Running ANTA tests"):
await asyncio.gather(*coroutines)
results = await asyncio.gather(*coroutines)
for result in results:
manager.add(result)

log_cache_statistics(selected_inventory.devices)
2 changes: 1 addition & 1 deletion tests/units/cli/nrfu/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_anta_nrfu_text(click_runner: CliRunner) -> None:
def test_anta_nrfu_text_multiple_failures(click_runner: CliRunner) -> None:
"""Test anta nrfu text with multiple failures, catalog is given via env."""
result = click_runner.invoke(anta, ["nrfu", "text"], env={"ANTA_CATALOG": str(DATA_DIR / "test_catalog_double_failure.yml")})
assert result.exit_code == ExitCode.OK
assert result.exit_code == ExitCode.TESTS_FAILED
assert (
"""spine1 :: VerifyInterfacesSpeed :: FAILURE
Interface `Ethernet2` is not found.
Expand Down
101 changes: 101 additions & 0 deletions tests/units/result_manager/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import json
import logging
import re
from contextlib import AbstractContextManager, nullcontext
from typing import TYPE_CHECKING, Callable
Expand Down Expand Up @@ -379,3 +380,103 @@ def test_get_devices(self, test_result_factory: Callable[[], TestResult], list_r

assert len(result_manager.get_devices()) == 2
assert all(t in result_manager.get_devices() for t in ["Device1", "Device2"])

def test_stats_computation_methods(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None:
"""Test ResultManager internal stats computation methods."""
result_manager = ResultManager()

# Initially stats should be unsynced
assert result_manager._stats_in_sync is False

# Test _reset_stats
result_manager._reset_stats()
assert result_manager._stats_in_sync is False
assert len(result_manager._device_stats) == 0
assert len(result_manager._category_stats) == 0
assert len(result_manager._test_stats) == 0

# Add some test results
test1 = test_result_factory()
test1.name = "device1"
test1.result = AntaTestStatus.SUCCESS
test1.categories = ["system"]
test1.test = "test1"

test2 = test_result_factory()
test2.name = "device2"
test2.result = AntaTestStatus.FAILURE
test2.categories = ["interfaces"]
test2.test = "test2"

result_manager.add(test1)
result_manager.add(test2)

# Stats should still be unsynced after adding results
assert result_manager._stats_in_sync is False

# Test _compute_stats directly
with caplog.at_level(logging.INFO):
result_manager._compute_stats()
assert "Computing statistics for all results" in caplog.text
assert result_manager._stats_in_sync is True

# Verify stats content
assert len(result_manager._device_stats) == 2
assert len(result_manager._category_stats) == 2
assert len(result_manager._test_stats) == 2
assert result_manager._device_stats["device1"].tests_success_count == 1
assert result_manager._device_stats["device2"].tests_failure_count == 1
assert result_manager._category_stats["system"].tests_success_count == 1
assert result_manager._category_stats["interfaces"].tests_failure_count == 1
assert result_manager._test_stats["test1"].devices_success_count == 1
assert result_manager._test_stats["test2"].devices_failure_count == 1

def test_stats_property_computation(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None:
"""Test that stats are computed only once when accessed via properties."""
result_manager = ResultManager()

# Add some test results
test1 = test_result_factory()
test1.name = "device1"
test1.result = AntaTestStatus.SUCCESS
test1.categories = ["system"]
result_manager.add(test1)

test2 = test_result_factory()
test2.name = "device2"
test2.result = AntaTestStatus.FAILURE
test2.categories = ["interfaces"]
result_manager.add(test2)

# Stats should be unsynced after adding results
assert result_manager._stats_in_sync is False
assert "Computing statistics" not in caplog.text

# Access device_stats property - should trigger computation
with caplog.at_level(logging.INFO):
_ = result_manager.device_stats
assert "Computing statistics for all results" in caplog.text
assert result_manager._stats_in_sync is True

# Clear the log
caplog.clear()

# Access other stats properties - should not trigger computation again
with caplog.at_level(logging.INFO):
_ = result_manager.category_stats
_ = result_manager.test_stats
_ = result_manager.sorted_category_stats
assert "Computing statistics" not in caplog.text

# Add another result - should mark stats as unsynced
test3 = test_result_factory()
test3.name = "device3"
test3.result = "error"
result_manager.add(test3)
assert result_manager._stats_in_sync is False

# Access stats again - should trigger recomputation
with caplog.at_level(logging.INFO):
_ = result_manager.device_stats
assert "Computing statistics for all results" in caplog.text
assert result_manager._stats_in_sync is True

0 comments on commit f702c91

Please sign in to comment.