Skip to content

Commit

Permalink
issue-505: Updated the testcase for check RIB
Browse files Browse the repository at this point in the history
  • Loading branch information
MaheshGSLAB committed Jan 11, 2024
1 parent a97ff44 commit 321cf4a
Show file tree
Hide file tree
Showing 3 changed files with 918 additions and 70 deletions.
124 changes: 95 additions & 29 deletions anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# mypy: disable-error-code=attr-defined
from __future__ import annotations

from ipaddress import IPv4Address, IPv6Address
from ipaddress import IPv4Address, IPv4Network, IPv6Address
from typing import Any, List, Optional, Union, cast

from pydantic import BaseModel, PositiveInt, model_validator
Expand Down Expand Up @@ -132,6 +132,30 @@ def _check_peer_issues(peer_data: Optional[dict[str, Any]]) -> dict[str, Any]:
return {}


def check_mismatch(path_data: dict[str, Any], failures: dict[str, Any]) -> dict[str, Any]:
"""
This function checks for mismatches between 'bgp_paths' and 'ip_paths' in the given path data.
If a mismatch is found, it updates the failures dictionary with the mismatched paths.
Parameters:
path_data (dict): A dictionary containing path data.
It is expected to have a nested structure: {prefix: {vrf: {path_info}}}.
failures (dict): A dictionary to record any mismatches found.
It is expected to have a nested structure: {prefix: {vrf: {failure_info}}}.
Returns:
dict: The updated failures dictionary with any mismatches found.
"""
for prefix, vrf_data in path_data.items():
for vrf, paths_info in vrf_data.items():
if "bgp_paths" in paths_info and "ip_paths" in paths_info:
# Check for mismatch
if paths_info["ip_paths"] != paths_info["bgp_paths"]:
# Create nested dictionary structure for failures
failures.setdefault(prefix, {}).setdefault(vrf, {})["mismatched_paths"] = paths_info
return failures


class VerifyBGPPeerCount(AntaTest):
"""
This test verifies the count of BGP peers for a given address family.
Expand Down Expand Up @@ -449,60 +473,102 @@ def test(self) -> None:
self.result.is_failure(f"Failures: {list(failures.values())}")


class VerifyBGPEcmpPath(AntaTest):
class VerifyBGPEcmp(AntaTest):
"""
Verifies the installation of BGP Equal-Cost Multipath (ECMP) path in the specified VRF.
Expected results:
* success: The test will pass if a BGP route is contributed in ecmp and at least one path is an ecmp head in the specified VRF.
* failure: The test will fail if a BGP route is not found, no contributed in ecmp, or ecmp head is not found for any path in the specified VRF.
* success: The test will pass if a BGP prefix has ECMP installed and the next hop addresses are the same as the IP route in the specified VRF.
* failure: The test will fail if a BGP prefix is not found, ECMP path is not found and the ECMP paths do not match with the IP paths in the specified VRF.
"""

name = "VerifyBGPEcmpPath"
name = "VerifyBGPEcmp"
description = "Verifies the installation of BGP Equal-Cost Multipath (ECMP) path in the specified VRF."
categories = ["routing", "bgp"]
commands = [AntaCommand(command="show ip bgp")]
commands = [
AntaTemplate(template="show bgp ipv4 unicast {prefix} vrf {vrf}"),
AntaTemplate(template="show ip route vrf {vrf} {prefix}"),
]

class Input(AntaTest.Input):
"""
Input parameters for the test.
"""

bgp_routes: List[BgpPeers]
"""List of BGP routes"""
prefixes: List[BgpPrefixes]
"""List of BGP prefixes"""

class BgpPeers(BaseModel):
class BgpPrefixes(BaseModel):
"""
This class defines the details of a BGP route.
This class defines the details of a BGP prefix.
"""

route: str
"""IPv4/IPv6 BGP route"""
prefix: IPv4Network
"""IPv4 BGP prefix"""
vrf: str = "default"
"""VRF context"""
"""Optional VRF for BGP peer. If not provided, it defaults to `default`."""
ecmp_count: int
"""Maximum number of ECMP paths"""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Renders the template with the provided inputs. Returns a list of commands to be executed."""

return [template.render(prefix=bgp_prefix.prefix, vrf=bgp_prefix.vrf, ecmp_count=bgp_prefix.ecmp_count) for bgp_prefix in self.inputs.prefixes]

@AntaTest.anta_test
def test(self) -> None:
failures: dict[str, Any] = {}
path_data: dict[str, Any] = {}
ecmp_install = {}

# Iterate over each bgp route
for bgp_route in self.inputs.bgp_routes:
route = str(bgp_route.route)
vrf = bgp_route.vrf

# Verify if BGP route exist
if not (bgp_output := get_value(self.instance_commands[0].json_output, f"vrfs..{vrf}..bgpRouteEntries..{route}", separator="..")):
failures[str(route)] = {vrf: "Not configured"}
# Iterate over command output for different prefixes
for command in self.instance_commands:
prefix = str(command.params["prefix"])
vrf = command.params["vrf"]
ecmp_count = command.params["ecmp_count"]

# Determine if the command is for unicast
is_unicast = "unicast" in command.command
paths = get_value(
command.json_output,
f"vrfs..{vrf}..bgpRouteEntries..{prefix}..bgpRoutePaths" if is_unicast else f"vrfs..{vrf}..routes..{prefix}..vias",
separator="..",
)

# If paths do not exist, mark as not configured
if not paths:
failures.setdefault(prefix, {}).setdefault(vrf, {}).setdefault("unicast" if is_unicast else "rib", "Not configured")
continue

# Verify BGP route's ecmp head and contribution
bgp_paths = bgp_output.get("bgpRoutePaths", [])
failures[route] = {vrf: "ECMP path is not installed"}
for path in bgp_paths:
if path.get("routeType", {}).get("ecmpHead") and path.get("routeType", {}).get("ecmpContributor"):
failures.pop(route)
break
# Check the ecmp count and rib path count with respect to input
if len(paths) != ecmp_count:
failures.setdefault(prefix, {}).setdefault(vrf, {}).setdefault("ecmp_paths" if is_unicast else "rib_paths", len(paths))

# Process the paths based on the command type
addresses = set()
for path in paths:
if is_unicast:
# For unicast, check various routeType conditions
if all(get_value(path, f"routeType.{attr}") for attr in ["ecmpHead", "valid", "active"]):
ecmp_install[prefix] = True
addresses.add(path.get("nextHop"))
elif all(get_value(path, f"routeType.{attr}") for attr in ["ecmp", "ecmpContributor", "valid"]):
addresses.add(path.get("nextHop"))
else:
# For non-unicast, simply add the nexthopAddr
if not ecmp_install.get(prefix):
failures.setdefault(prefix, {}).setdefault(vrf, {})["ecmp_install"] = False
break
addresses.add(path.get("nexthopAddr"))

# Update path_data with the addresses
path_data.setdefault(prefix, {}).setdefault(vrf, {}).update(
{"bgp_paths" if is_unicast else "ip_paths": sorted(addresses)} if ecmp_install.get(prefix) else ""
)

# Compare bgp_paths and ip_paths
failures = check_mismatch(path_data, failures)

if not failures:
self.result.is_success()
else:
self.result.is_failure(f"Following BGP routes are not configured, not contributed in ecmp, or ecmp head is not found:\n{failures}")
self.result.is_failure(f"Following BGP prefixes are not configured, ECMP not installed, or ecmp path count is not matched:\n{failures}")
14 changes: 8 additions & 6 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,14 @@ anta.tests.routing:
- 10.1.255.0
- 10.1.255.2
- 10.1.255.4
- VerifyBGPEcmpPath:
bgp_routes:
- route: 192.0.255.1/32
vrf: default
- route: 192.0.254.3/32
vrf: default
- VerifyBGPEcmp:
prefixes:
- prefix: 192.0.254.5/32
vrf: MGMT
ecmp_count: 2
- prefix: 192.0.254.3/32
vrf: default
ecmp_count: 2
ospf:
- VerifyOSPFNeighborState:
- VerifyOSPFNeighborCount:
Expand Down
Loading

0 comments on commit 321cf4a

Please sign in to comment.