diff --git a/.gitignore b/.gitignore index 1ce52bd..651f6de 100644 --- a/.gitignore +++ b/.gitignore @@ -81,6 +81,7 @@ celerybeat-schedule .env # virtualenv +/.venv/ /venv/ /ENV/ diff --git a/README.md b/README.md index 18bdaaa..fd1699f 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ The KatharĂ¡ Lab Checker takes in input a configuration file specifying the test To run the tool you only need to run the `main.py` passing the desired configuration file. ```bash -python3 -m kathara-lab-checker --config +python3 -m kathara_lab_checker --config --labs ``` At this point, the tool parses the provided configuration file and executes the tests. For each network scenario the @@ -47,7 +47,7 @@ The repository already provide a complete example with the results of the tests. You can re-run the example by typing the following command in the root directory of the project: ```bash -python3 -m kathara-lab-checker --config examples/configuration_palabra.json --no-cache +python3 -m kathara_lab_checker --config examples/palabra/configuration_palabra.json --no-cache --labs examples/palabra/labs ``` The `--no-cache` flag force to repeat already executed tests. @@ -68,7 +68,8 @@ In the following you will find the possible values for the configuration file. ], "ip_mapping": { "": { - ">": "" # Check that the ip/netmask is configured on the interface of the device + ">": "" # Check that the ip/netmask is configured on the interface of the device + ">": "" # Check that the ip/netmask is configured on the interface eth# of the device }, }, "daemons": { @@ -81,7 +82,7 @@ In the following you will find the possible values for the configuration file. "kernel_routes": { "": [ "", # Check the presence of the route in the data-plane of the device - "[, [, ]]" # Check the presence of the route in the data-plane of the device + "[, [, , ]]" # Check the presence of the route in the data-plane of the device # And checks also that the nexthops are set on the correct interfaces ] }, @@ -119,18 +120,31 @@ In the following you will find the possible values for the configuration file. "", # Check if the device has the local_ns_ip as local name server. ] }, - "reachability": { - "": [ - "", # Check if device name reaches the dns_name - ] + "records": { + "A": { # The software can check for every type of DNS records + "": [ + "" # Check if the dns_name is resolved to the ip + ] + } } } }, "reachability": { # Check reachability between devices "": [ "", # Check if the device reaches the ip + "", # Check if the device reaches the dns_name ], - } + }, + "custom_commands": { # Execute a command inside a device and checks the output + "": [ + { + "command": "", # Command to execute + "regex_match": "", # Check if the output matches the regex + "output": "", # Check if the output is the expected one + "exit_code": # Check if the command exit code is the expected one + } + ] + } } } ``` diff --git a/examples/palabra/labs/lab1/lab1_result.xlsx b/examples/palabra/labs/lab1/lab1_result.xlsx index e13621e..ea7e582 100644 Binary files a/examples/palabra/labs/lab1/lab1_result.xlsx and b/examples/palabra/labs/lab1/lab1_result.xlsx differ diff --git a/examples/palabra/labs/lab2/lab2_result.xlsx b/examples/palabra/labs/lab2/lab2_result.xlsx index 2258280..78fc32e 100644 Binary files a/examples/palabra/labs/lab2/lab2_result.xlsx and b/examples/palabra/labs/lab2/lab2_result.xlsx differ diff --git a/examples/palabra/labs/lab3/lab3_result.xlsx b/examples/palabra/labs/lab3/lab3_result.xlsx index 03a54d5..acf07bf 100644 Binary files a/examples/palabra/labs/lab3/lab3_result.xlsx and b/examples/palabra/labs/lab3/lab3_result.xlsx differ diff --git a/examples/palabra/labs/lab4/lab4_result.xlsx b/examples/palabra/labs/lab4/lab4_result.xlsx index 94de856..5ac6e30 100644 Binary files a/examples/palabra/labs/lab4/lab4_result.xlsx and b/examples/palabra/labs/lab4/lab4_result.xlsx differ diff --git a/examples/palabra/labs/results.xlsx b/examples/palabra/labs/results.xlsx index 3702173..00f26d8 100644 Binary files a/examples/palabra/labs/results.xlsx and b/examples/palabra/labs/results.xlsx differ diff --git a/examples/palabra/results.xlsx b/examples/palabra/results.xlsx deleted file mode 100644 index 79a8ba8..0000000 Binary files a/examples/palabra/results.xlsx and /dev/null differ diff --git a/pyproject.toml b/pyproject.toml index 9aa6309..e5863e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kathara-lab-checker" - version = "0.1.2" + version = "0.1.3" description = "Tool to automatically check KatharĂ¡ network scenarios based on a configuration file." readme = "README.md" requires-python = ">=3.11" @@ -33,7 +33,7 @@ dependencies = [ ] [project.scripts] -kathara-lab-checker = "main:main" +kathara-lab-checker = "kathara_lab_checker:main" [project.urls] "Bug Reports" = "https://github.com/KatharaFramework/kathara-lab-checker/issues" diff --git a/src/main.py b/src/kathara_lab_checker.py similarity index 69% rename from src/main.py rename to src/kathara_lab_checker.py index 651bd3f..3611844 100644 --- a/src/main.py +++ b/src/kathara_lab_checker.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 import argparse -import importlib.metadata import json import logging import os @@ -17,29 +16,31 @@ from Kathara.setting.Setting import Setting from tqdm import tqdm -from kathara_lab_checker.TestCollector import TestCollector -from kathara_lab_checker.checks.BridgeCheck import BridgeCheck -from kathara_lab_checker.checks.CollisionDomainCheck import CollisionDomainCheck -from kathara_lab_checker.checks.DaemonCheck import DaemonCheck -from kathara_lab_checker.checks.DeviceExistenceCheck import DeviceExistenceCheck -from kathara_lab_checker.checks.IPv6EnabledCheck import IPv6EnabledCheck -from kathara_lab_checker.checks.InterfaceIPCheck import InterfaceIPCheck -from kathara_lab_checker.checks.KernelRouteCheck import KernelRouteCheck -from kathara_lab_checker.checks.ReachabilityCheck import ReachabilityCheck -from kathara_lab_checker.checks.StartupExistenceCheck import StartupExistenceCheck -from kathara_lab_checker.checks.SysctlCheck import SysctlCheck -from kathara_lab_checker.checks.applications.dns.DNSAuthorityCheck import DNSAuthorityCheck -from kathara_lab_checker.checks.applications.dns.LocalNSCheck import LocalNSCheck -from kathara_lab_checker.checks.protocols.AnnouncedNetworkCheck import AnnouncedNetworkCheck -from kathara_lab_checker.checks.protocols.ProtocolRedistributionCheck import ProtocolRedistributionCheck -from kathara_lab_checker.checks.protocols.bgp.BGPPeeringCheck import BGPPeeringCheck -from kathara_lab_checker.checks.protocols.evpn.AnnouncedVNICheck import AnnouncedVNICheck -from kathara_lab_checker.checks.protocols.evpn.EVPNSessionCheck import EVPNSessionCheck -from kathara_lab_checker.checks.protocols.evpn.VTEPCheck import VTEPCheck -from kathara_lab_checker.utils import reverse_dictionary, write_final_results_to_excel, write_result_to_excel - - -VERSION = "0.1.2" +from lab_checker.TestCollector import TestCollector +from lab_checker.checks.BridgeCheck import BridgeCheck +from lab_checker.checks.CollisionDomainCheck import CollisionDomainCheck +from lab_checker.checks.CustomCommandCheck import CustomCommandCheck +from lab_checker.checks.DaemonCheck import DaemonCheck +from lab_checker.checks.DeviceExistenceCheck import DeviceExistenceCheck +from lab_checker.checks.IPv6EnabledCheck import IPv6EnabledCheck +from lab_checker.checks.InterfaceIPCheck import InterfaceIPCheck +from lab_checker.checks.KernelRouteCheck import KernelRouteCheck +from lab_checker.checks.ReachabilityCheck import ReachabilityCheck +from lab_checker.checks.StartupExistenceCheck import StartupExistenceCheck +from lab_checker.checks.SysctlCheck import SysctlCheck +from lab_checker.checks.applications.dns.DNSAuthorityCheck import DNSAuthorityCheck +from lab_checker.checks.applications.dns.DNSRecordCheck import DNSRecordCheck +from lab_checker.checks.applications.dns.LocalNSCheck import LocalNSCheck +from lab_checker.checks.protocols.AnnouncedNetworkCheck import AnnouncedNetworkCheck +from lab_checker.checks.protocols.ProtocolRedistributionCheck import ProtocolRedistributionCheck +from lab_checker.checks.protocols.bgp.BGPPeeringCheck import BGPPeeringCheck +from lab_checker.checks.protocols.evpn.AnnouncedVNICheck import AnnouncedVNICheck +from lab_checker.checks.protocols.evpn.EVPNSessionCheck import EVPNSessionCheck +from lab_checker.checks.protocols.evpn.VTEPCheck import VTEPCheck +from lab_checker.excel_utils import write_final_results_to_excel, write_result_to_excel +from lab_checker.utils import reverse_dictionary + +VERSION = "0.1.3" CURRENT_LAB: Optional[Lab] = None @@ -51,9 +52,15 @@ def handler(signum, frame, live=False): exit(1) -def run_on_single_network_scenario(lab_path: str, configuration: dict, lab_template: Lab, - no_cache: bool = False, live: bool = False, keep_open: bool = False, - skip_report: bool = False): +def run_on_single_network_scenario( + lab_path: str, + configuration: dict, + lab_template: Lab, + no_cache: bool = False, + live: bool = False, + keep_open: bool = False, + skip_report: bool = False, +): global CURRENT_LAB logger = logging.getLogger("kathara-lab-checker") @@ -111,9 +118,10 @@ def run_on_single_network_scenario(lab_path: str, configuration: dict, lab_templ check_results = CollisionDomainCheck().run(list(lab_template.links.values()), lab) test_collector.add_check_results(lab_name, check_results) - logger.info("Checking that all required startup files exist...") - check_results = StartupExistenceCheck().run(configuration["test"]["requiring_startup"], lab) - test_collector.add_check_results(lab_name, check_results) + if "requiring_startup" in configuration["test"]: + logger.info("Checking that all required startup files exist...") + check_results = StartupExistenceCheck().run(configuration["test"]["requiring_startup"], lab) + test_collector.add_check_results(lab_name, check_results) if "ipv6_enabled" in configuration["test"]: logger.info(f"Checking that IPv6 is enabled on devices: {configuration['test']['ipv6_enabled']}") @@ -191,22 +199,32 @@ def run_on_single_network_scenario(lab_path: str, configuration: dict, lab_templ if "applications" in configuration["test"]: for application_name, application in configuration["test"]["applications"].items(): if application_name == "dns": - logger.info("Checking DNS configurations...") - check_results = DNSAuthorityCheck().run( - application["authoritative"], - list(application["local_ns"].keys()), - configuration["test"]["ip_mapping"], - lab, - ) - test_collector.add_check_results(lab_name, check_results) + if "authoritative" in application: + logger.info("Checking DNS configurations...") + check_results = DNSAuthorityCheck().run( + application["authoritative"], + list(application["local_ns"].keys()), + configuration["test"]["ip_mapping"], + lab, + ) + test_collector.add_check_results(lab_name, check_results) - logger.info("Checking local name servers configurations...") - check_results = LocalNSCheck().run(application["local_ns"], lab) - test_collector.add_check_results(lab_name, check_results) + if "local_ns" in application: + logger.info("Checking local name servers configurations...") + check_results = LocalNSCheck().run(application["local_ns"], lab) + test_collector.add_check_results(lab_name, check_results) - logger.info(f"Starting reachability test for DNS...") - check_results = ReachabilityCheck().run(reverse_dictionary(application["reachability"]), lab) - test_collector.add_check_results(lab_name, check_results) + if "records" in application: + logger.info(f"Starting test for DNS records...") + check_results = DNSRecordCheck().run( + application["records"], reverse_dictionary(application["local_ns"]).keys(), lab + ) + test_collector.add_check_results(lab_name, check_results) + + if "custom_commands" in configuration["test"]: + logger.info("Checking custom commands output...") + check_results = CustomCommandCheck().run(configuration["test"]["custom_commands"], lab) + test_collector.add_check_results(lab_name, check_results) if not live and not keep_open: logger.info("Undeploying Network Scenario") @@ -224,8 +242,15 @@ def run_on_single_network_scenario(lab_path: str, configuration: dict, lab_templ return test_collector -def run_on_multiple_network_scenarios(labs_path: str, configuration: dict, lab_template: Lab, no_cache: bool = False, - live: bool = False, keep_open: bool = False, skip_report: bool = False): +def run_on_multiple_network_scenarios( + labs_path: str, + configuration: dict, + lab_template: Lab, + no_cache: bool = False, + live: bool = False, + keep_open: bool = False, + skip_report: bool = False, +): logger = logging.getLogger("kathara-lab-checker") labs_path = os.path.abspath(labs_path) @@ -264,11 +289,7 @@ def parse_arguments(): help="The path to the configuration file for the tests", ) - parser.add_argument( - '-v', '--version', - action='version', - version=f'kathara-lab-checker {VERSION}' - ) + parser.add_argument("-v", "--version", action="version", version=f"kathara-lab-checker {VERSION}") parser.add_argument( "--no-cache", @@ -338,17 +359,23 @@ def main(): Setting.get_instance().load_from_dict({"image": conf["default_image"]}) logger.info(f"Parsing network scenarios template in: {conf['structure']}") + if not os.path.exists(conf["structure"]): + logger.error(f"The structure file {conf['structure']} does not exist") + exit(1) + template_lab = LabParser().parse( os.path.dirname(conf["structure"]), conf_name=os.path.basename(conf["structure"]), ) if args.lab: - run_on_single_network_scenario(args.lab, conf, template_lab, args.no_cache, args.live, args.keep_open, - args.skip_report) + run_on_single_network_scenario( + args.lab, conf, template_lab, args.no_cache, args.live, args.keep_open, args.skip_report + ) elif args.labs: - run_on_multiple_network_scenarios(args.labs, conf, template_lab, args.no_cache, args.live, args.keep_open, - args.skip_report) + run_on_multiple_network_scenarios( + args.labs, conf, template_lab, args.no_cache, args.live, args.keep_open, args.skip_report + ) if __name__ == "__main__": diff --git a/src/kathara_lab_checker/checks/KernelRouteCheck.py b/src/kathara_lab_checker/checks/KernelRouteCheck.py deleted file mode 100644 index bae0a33..0000000 --- a/src/kathara_lab_checker/checks/KernelRouteCheck.py +++ /dev/null @@ -1,54 +0,0 @@ -from typing import Union - -from Kathara.exceptions import MachineNotRunningError -from Kathara.model.Lab import Lab - -from kathara_lab_checker.utils import get_kernel_routes, load_routes_from_ip_route, load_routes_from_expected -from .AbstractCheck import AbstractCheck -from .CheckResult import CheckResult - - -class KernelRouteCheck(AbstractCheck): - def check(self, device_name: str, expected_routing_table: list, lab: Lab) -> list[CheckResult]: - self.description = f"Checking the routing table of {device_name}" - actual_routing_table = load_routes_from_ip_route(get_kernel_routes(device_name, lab)) - expected_routing_table = load_routes_from_expected(expected_routing_table) - - results = [] - - for dst, nexthops in expected_routing_table.items(): - if not dst in actual_routing_table: - check_result = CheckResult( - self.description, False, f"The routing table of {device_name} is missing route {dst}" - ) - results.append(check_result) - self.logger.info(check_result) - continue - if nexthops: - actual_nh = actual_routing_table[dst] - if actual_nh != nexthops: - check_result = CheckResult( - self.description, - False, - f"The routing table of {device_name} about route {dst} have the wrong next-hops: {nexthops ^ actual_nh}", - ) - results.append(check_result) - self.logger.info(check_result) - - if not results: - check_result = CheckResult(self.description, True, f"OK") - results.append(check_result) - self.logger.info(check_result) - - return results - - def run(self, devices_to_routes: dict[str, list[Union[str, list[str]]]], lab: Lab) -> list[CheckResult]: - results = [] - for device_name, expected_routes in devices_to_routes.items(): - self.logger.info(f"Checking kernel routes for `{device_name}`...") - try: - check_result = self.check(device_name, expected_routes, lab) - results = results + check_result - except MachineNotRunningError: - self.logger.warning(f"`{device_name}` is not running. Skipping checks...") - return results diff --git a/src/kathara_lab_checker/utils.py b/src/kathara_lab_checker/utils.py deleted file mode 100644 index 3117e33..0000000 --- a/src/kathara_lab_checker/utils.py +++ /dev/null @@ -1,214 +0,0 @@ -import json -import os -from typing import Any, Optional - -from Kathara.exceptions import MachineNotRunningError -from Kathara.manager.Kathara import Kathara -from Kathara.model.Lab import Lab -from openpyxl.styles import Alignment -from openpyxl.workbook import Workbook -from openpyxl.worksheet.worksheet import Worksheet - - -def red(s): - return f"\033[91m {s}\033[00m" - - -def green(s): - return f"\033[92m {s}\033[00m" - - -def yellow(s): - return f"\033[93m {s}\033[00m" - - -def get_output(exec_output): - output = "" - try: - while True: - (stdout, stderr) = next(exec_output) - stdout = stdout.decode("utf-8") if stdout else "" - stderr = stderr.decode("utf-8") if stderr else "" - - if stdout: - output += stdout - if stderr: - output += f"ERROR: {stderr}" - except StopIteration: - pass - return output - - -def get_interfaces_addresses(device_name: str, lab: Lab) -> dict: - kathara_manager = Kathara.get_instance() - - exec_output_gen = kathara_manager.exec( - machine_name=device_name, - command=f"ip -j address", - lab_hash=lab.hash, - ) - - return json.loads(get_output(exec_output_gen)) - - -def get_kernel_routes(device_name: str, lab: Lab) -> list[dict[str, Any]]: - kathara_manager = Kathara.get_instance() - - try: - output = get_output( - kathara_manager.exec(machine_name=device_name, lab_hash=lab.hash, command="ip -j route") - ) - except MachineNotRunningError: - return [] - - return json.loads(output) - - -def find_device_name_from_ip(ip_mapping: dict[str, dict[str, str]], ip_search: str) -> Optional[str]: - for device, ip_addresses in ip_mapping.items(): - for _, ip in ip_addresses.items(): - # Check if the base IP matches (ignoring the CIDR notation) - if ip.split("/")[0] == ip_search: - return device - raise Exception("Something is missing/wrong in the ip_mapping configuration!") - - -def find_lines_with_string(file_content, search_string): - """ - Returns lines from the provided multi-line string that contain the search string. - - :param file_content: A string representing the content of a file (multi-line string). - :param search_string: A string to search for in each line of the file content. - :return: A list of lines that contain the search string. - """ - # Splitting the string into lines - lines = file_content.split("\n") - - # Filtering lines that contain the search string - matching_lines = [line for line in lines if search_string in line] - - return matching_lines - - -def write_final_results_to_excel(test_collector: "TestCollectorPackage.TestCollector", path: str): - # Create a new Excel workbook - workbook = Workbook() - - # Select the active sheet - sheet = workbook.active - - sheet["A1"] = "Student Name" - sheet["B1"] = "Tests Passed" - sheet["C1"] = "Tests Failed" - sheet["D1"] = "Tests Total Number" - sheet["E1"] = "Problems" - - for index, (test_name, test_results) in enumerate(test_collector.tests.items()): - failed_tests = test_collector.get_failed(test_name) - passed_tests = test_collector.get_passed(test_name) - sheet["A" + str(index + 2)] = test_name - sheet["B" + str(index + 2)] = len(passed_tests) - sheet["C" + str(index + 2)] = len(failed_tests) - sheet["D" + str(index + 2)] = len(test_results) - - if failed_tests: - failed_string = "" - for idx, failed in enumerate(failed_tests): - failed_string += f"{(idx + 1)}: {failed.reason}\n" - if len(failed_string) >= 32767: - raise Exception("ERROR: Excel cell too big") - sheet["E" + str(index + 2)] = failed_string - sheet["E" + str(index + 2)].alignment = Alignment(wrapText=True) - else: - sheet["E" + str(index + 2)] = "None" - - excel_file = os.path.join(path, "results.xlsx") - workbook.save(excel_file) - - -def _write_sheet_row(sheet: Worksheet, column: int, description: str, passed: str, reason: str) -> None: - sheet["A" + str(column + 2)] = description - sheet["B" + str(column + 2)] = passed - sheet["C" + str(column + 2)] = reason - - -def write_result_to_excel(check_results: list["CheckResultPackage.CheckResult"], path: str): - # Create a new Excel workbook - workbook: Workbook = Workbook() - - workbook.create_sheet("Summary", 0) - sheet_summary = workbook.get_sheet_by_name("Summary") - sheet_summary["A1"] = "Total Tests" - sheet_summary["B1"] = "Passed Tests" - sheet_summary["C1"] = "Failed" - - _write_sheet_row( - sheet_summary, - 0, - str(len(check_results)), - str(len(list(filter(lambda x: x.passed, check_results)))), - str(len(list(filter(lambda x: not x.passed, check_results)))), - ) - - # Select the active sheet - workbook.create_sheet("All", 1) - sheet_all = workbook.get_sheet_by_name("All") - sheet_all["A1"] = "Tests Description" - sheet_all["B1"] = "Passed" - sheet_all["C1"] = "Reason" - - workbook.create_sheet("Failed", 2) - sheet_failed = workbook.get_sheet_by_name("Failed") - sheet_failed["A1"] = "Tests Description" - sheet_failed["B1"] = "Passed" - sheet_failed["C1"] = "Reason" - - failed_index = 0 - for index, check_result in enumerate(check_results): - if not check_result.passed: - _write_sheet_row( - sheet_failed, - failed_index, - check_result.description, - str(check_result.passed), - check_result.reason, - ) - failed_index += 1 - _write_sheet_row( - sheet_all, index, check_result.description, str(check_result.passed), check_result.reason - ) - workbook.save(os.path.join(path, f"{os.path.basename(path)}_result.xlsx")) - - -def reverse_dictionary(dictionary: dict): - reversed_dict = {} - for k, values in dictionary.items(): - for v in values: - reversed_dict[v] = reversed_dict.get(v, []) + [k] - return reversed_dict - - -def load_routes_from_expected(expected_routes: list) -> dict[str, set]: - routes = {} - for route in expected_routes: - if type(route) is list: - routes[route[0]] = set(route[1]) - else: - routes[route] = set() - return routes - - -def load_routes_from_ip_route(ip_route_output: list) -> dict[str, set]: - routes = {} - for route in ip_route_output: - - dst = route["dst"] - if dst == "default": - dst = "0.0.0.0/0" - nexthops = None - if "nexthops" in route: - nexthops = list(map(lambda x: x["dev"], route["nexthops"])) - if "gateway" in route: - nexthops = [route["gateway"]] - routes[dst] = set(nexthops) if nexthops else set() - return routes diff --git a/src/kathara_lab_checker/TestCollector.py b/src/lab_checker/TestCollector.py similarity index 100% rename from src/kathara_lab_checker/TestCollector.py rename to src/lab_checker/TestCollector.py diff --git a/src/kathara_lab_checker/TqdmLoggingHandler.py b/src/lab_checker/TqdmLoggingHandler.py similarity index 99% rename from src/kathara_lab_checker/TqdmLoggingHandler.py rename to src/lab_checker/TqdmLoggingHandler.py index c2fea27..438fc85 100644 --- a/src/kathara_lab_checker/TqdmLoggingHandler.py +++ b/src/lab_checker/TqdmLoggingHandler.py @@ -1,4 +1,5 @@ import logging + import tqdm diff --git a/src/kathara_lab_checker/__init__.py b/src/lab_checker/__init__.py similarity index 100% rename from src/kathara_lab_checker/__init__.py rename to src/lab_checker/__init__.py diff --git a/src/kathara_lab_checker/checks/AbstractCheck.py b/src/lab_checker/checks/AbstractCheck.py similarity index 100% rename from src/kathara_lab_checker/checks/AbstractCheck.py rename to src/lab_checker/checks/AbstractCheck.py diff --git a/src/kathara_lab_checker/checks/BridgeCheck.py b/src/lab_checker/checks/BridgeCheck.py similarity index 88% rename from src/kathara_lab_checker/checks/BridgeCheck.py rename to src/lab_checker/checks/BridgeCheck.py index 0e5ea4d..39b3232 100644 --- a/src/kathara_lab_checker/checks/BridgeCheck.py +++ b/src/lab_checker/checks/BridgeCheck.py @@ -1,9 +1,10 @@ import json + from Kathara.exceptions import MachineNotRunningError from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.utils import get_output +from lab_checker.utils import get_output from .AbstractCheck import AbstractCheck from .CheckResult import CheckResult @@ -22,8 +23,8 @@ def get_inteface_by_vni(interface_vni: str, interfaces: list[dict]): return list( filter( lambda x: "linkinfo" in x - and "id" in x["linkinfo"]["info_data"] - and x["linkinfo"]["info_data"]["id"] == int(interface_vni), + and "id" in x["linkinfo"]["info_data"] + and x["linkinfo"]["info_data"]["id"] == int(interface_vni), interfaces, ) ).pop() @@ -31,7 +32,7 @@ def get_inteface_by_vni(interface_vni: str, interfaces: list[dict]): class BridgeCheck(AbstractCheck): def check_bridge_interfaces( - self, device_name: str, expected_interfaces: list[str], actual_interfaces: list[dict] + self, device_name: str, expected_interfaces: list[str], actual_interfaces: list[dict] ) -> (CheckResult, set[str]): self.description = ( f"Checking that interfaces {expected_interfaces} " @@ -90,8 +91,8 @@ def check_vlan_filtering(self, device_name: str, bridge_info: dict) -> CheckResu f"Checking if VLAN filtering is enabled on `{bridge_info['ifname']}` of `{device_name}`" ) if ( - "vlan_filtering" in bridge_info["linkinfo"]["info_data"] - and bridge_info["linkinfo"]["info_data"]["vlan_filtering"] == 1 + "vlan_filtering" in bridge_info["linkinfo"]["info_data"] + and bridge_info["linkinfo"]["info_data"]["vlan_filtering"] == 1 ): return CheckResult(self.description, True, "OK") else: @@ -102,11 +103,11 @@ def check_vlan_filtering(self, device_name: str, bridge_info: dict) -> CheckResu ) def check_vlan_tags( - self, - device_name: str, - interface_name: str, - interface_configuration: dict, - actual_interface_vlan: dict, + self, + device_name: str, + interface_name: str, + interface_configuration: dict, + actual_interface_vlan: dict, ): self.description = ( f"Checking that vlans `{interface_configuration['vlan_tags']}` " @@ -124,7 +125,7 @@ def check_vlan_tags( return CheckResult(self.description, False, reason) def check_vxlan_pvid( - self, device_name: str, vni: str, pvid: str, actual_interfaces: list[dict], vlans_info: list[dict] + self, device_name: str, vni: str, pvid: str, actual_interfaces: list[dict], vlans_info: list[dict] ): self.description = f"Checking that `{device_name}` manages VNI `{vni}` with PVID `{pvid}`" @@ -151,7 +152,7 @@ def check_vxlan_pvid( return CheckResult(self.description, False, f"VNI `{vni}` not found on `{device_name}`") def check_vlan_pvid( - self, device_name: str, interface_name: str, interface_pvid: str, actual_interface_vlan: dict + self, device_name: str, interface_name: str, interface_pvid: str, actual_interface_vlan: dict ): self.description = f"Checking that `{interface_name}` of `{device_name}` has pvid {interface_pvid}" pvid = set( @@ -215,13 +216,11 @@ def run(self, devices_to_bridge_configuration: dict[str, list[dict]], lab: Lab) device_name, expected_bridge_interfaces, actual_interfaces ) results.append(check_result) - self.logger.info(check_result) if check_result.passed: check_result = self.check_vlan_filtering( device_name, get_interface_by_name(masters.pop(), actual_interfaces) ) - self.logger.info(check_result) results.append(check_result) for interface_name, interface_conf in bridge_conf["interfaces"].items(): @@ -230,7 +229,6 @@ def run(self, devices_to_bridge_configuration: dict[str, list[dict]], lab: Lab) try: actual_interface_vlans = get_interface_by_name(interface_name, actual_vlans) check_result = CheckResult(description, True, "OK") - self.logger.info(check_result) results.append(check_result) except IndexError: check_result = CheckResult( @@ -238,7 +236,6 @@ def run(self, devices_to_bridge_configuration: dict[str, list[dict]], lab: Lab) False, f"No VLAN found for for `{interface_name}` on `{device_name}`", ) - self.logger.info(check_result) results.append(check_result) if actual_interface_vlans: @@ -247,7 +244,6 @@ def run(self, devices_to_bridge_configuration: dict[str, list[dict]], lab: Lab) device_name, interface_name, interface_conf, actual_interface_vlans ) results.append(check_result) - self.logger.info(check_result) if "pvid" in interface_conf: check_result = self.check_vlan_pvid( @@ -257,7 +253,6 @@ def run(self, devices_to_bridge_configuration: dict[str, list[dict]], lab: Lab) actual_interface_vlans, ) results.append(check_result) - self.logger.info(check_result) if "vxlan" in bridge_conf: for vni, vni_info in bridge_conf["vxlan"].items(): @@ -265,5 +260,4 @@ def run(self, devices_to_bridge_configuration: dict[str, list[dict]], lab: Lab) device_name, vni, vni_info["pvid"], actual_interfaces, actual_vlans ) results.append(check_result) - self.logger.info(check_result) return results diff --git a/src/kathara_lab_checker/checks/CheckResult.py b/src/lab_checker/checks/CheckResult.py similarity index 71% rename from src/kathara_lab_checker/checks/CheckResult.py rename to src/lab_checker/checks/CheckResult.py index 563b94b..e858685 100644 --- a/src/kathara_lab_checker/checks/CheckResult.py +++ b/src/lab_checker/checks/CheckResult.py @@ -1,4 +1,6 @@ -from kathara_lab_checker.utils import green, red +import logging + +from lab_checker.utils import green, red class CheckResult: @@ -7,6 +9,7 @@ def __init__(self, description: str, passed: bool, reason: str) -> None: self.description: str = description self.passed: bool = passed self.reason: str = reason + logging.getLogger("kathara-lab-checker").info(self) def __str__(self) -> str: return f"{self.description}: {green(self.reason) if self.passed else red(self.reason)}" diff --git a/src/kathara_lab_checker/checks/CollisionDomainCheck.py b/src/lab_checker/checks/CollisionDomainCheck.py similarity index 92% rename from src/kathara_lab_checker/checks/CollisionDomainCheck.py rename to src/lab_checker/checks/CollisionDomainCheck.py index d313c28..afb51a9 100644 --- a/src/kathara_lab_checker/checks/CollisionDomainCheck.py +++ b/src/lab_checker/checks/CollisionDomainCheck.py @@ -27,6 +27,5 @@ def run(self, template_cds: list[Link], lab: Lab) -> list[CheckResult]: results = [] for cd_t in template_cds: check_result = self.check(cd_t, lab) - self.logger.info(check_result) results.append(check_result) - return results \ No newline at end of file + return results diff --git a/src/lab_checker/checks/CustomCommandCheck.py b/src/lab_checker/checks/CustomCommandCheck.py new file mode 100644 index 0000000..3fbadc4 --- /dev/null +++ b/src/lab_checker/checks/CustomCommandCheck.py @@ -0,0 +1,74 @@ +import logging +import re + +from Kathara.exceptions import MachineNotFoundError +from Kathara.manager.Kathara import Kathara +from Kathara.model.Lab import Lab + +from .AbstractCheck import AbstractCheck +from .CheckResult import CheckResult + + +class CustomCommandCheck(AbstractCheck): + + def check(self, device_name: str, command_entry: dict[str, str | int], lab: Lab) -> list[CheckResult]: + kathara_manager: Kathara = Kathara.get_instance() + + results = [] + try: + device = lab.get_machine(device_name) + stdout, stderr, exit_code = kathara_manager.exec( + machine_name=device.name, lab_hash=lab.hash, command=command_entry["command"], stream=False + ) + + stdout = stdout.decode("utf-8").strip() if stdout else stderr.decode("utf-8").strip() + + if "exit_code" in command_entry: + self.description = ( + f"Checking the exit code of the command '{command_entry['command']}' on '{device_name}'" + ) + if exit_code == command_entry["exit_code"]: + results.append(CheckResult(self.description, True, "OK")) + else: + reason = ( + f"The exit code of the command differs from the expected one." + f"\n Actual: {exit_code}\n Expected: {command_entry['exit_code']}" + ) + results.append(CheckResult(self.description, False, reason)) + + self.description = f"Checking the output of the command '{command_entry['command']}' on '{device_name}'" + if "output" in command_entry: + stdout = stdout.replace("\r\n", "\n").replace("\r", "\n") + command_entry["output"] = command_entry["output"].replace("\r\n", "\n").replace("\r", "\n") + + if stdout == command_entry["output"].replace("\r\n", "\n").replace("\r", "\n"): + results.append(CheckResult(self.description, True, "OK")) + else: + reason = ( + f"The output of the command differs from the expected one." + f"\n Actual: {stdout}\n Expected: {command_entry['output']}" + ) + results.append(CheckResult(self.description, False, reason)) + if "regex_match" in command_entry: + + if re.search(command_entry["regex_match"], stdout): + results.append(CheckResult(self.description, True, "OK")) + else: + reason = ( + f"The output of the command do not match the expected regex." + f"\n Actual: {stdout}\n Regex: {command_entry['regex_match']}" + ) + results.append(CheckResult(self.description, False, reason)) + + except MachineNotFoundError as e: + results.append(CheckResult(self.description, False, str(e))) + + return results + + def run(self, devices_to_commands: dict[str, list[dict[str, str | int]]], lab: Lab) -> list[CheckResult]: + results = [] + for device_name, command_entries in devices_to_commands.items(): + for command_entry in command_entries: + check_result = self.check(device_name, command_entry, lab) + results.extend(check_result) + return results diff --git a/src/kathara_lab_checker/checks/DaemonCheck.py b/src/lab_checker/checks/DaemonCheck.py similarity index 91% rename from src/kathara_lab_checker/checks/DaemonCheck.py rename to src/lab_checker/checks/DaemonCheck.py index 386e07d..2d60035 100644 --- a/src/kathara_lab_checker/checks/DaemonCheck.py +++ b/src/lab_checker/checks/DaemonCheck.py @@ -2,7 +2,7 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.utils import get_output +from lab_checker.utils import get_output from .AbstractCheck import AbstractCheck from .CheckResult import CheckResult @@ -39,6 +39,5 @@ def run(self, devices_to_daemons: dict[str, list[str]], lab: Lab) -> list[CheckR self.logger.info(f"Checking if daemons are running on `{device_name}`...") for daemon_name in daemons: check_result = self.check(device_name, daemon_name, lab) - self.logger.info(check_result) results.append(check_result) - return results \ No newline at end of file + return results diff --git a/src/kathara_lab_checker/checks/DeviceExistenceCheck.py b/src/lab_checker/checks/DeviceExistenceCheck.py similarity index 90% rename from src/kathara_lab_checker/checks/DeviceExistenceCheck.py rename to src/lab_checker/checks/DeviceExistenceCheck.py index 9b1862d..b33ad36 100644 --- a/src/kathara_lab_checker/checks/DeviceExistenceCheck.py +++ b/src/lab_checker/checks/DeviceExistenceCheck.py @@ -20,6 +20,5 @@ def run(self, template_machines: list[str], lab: Lab) -> list[CheckResult]: results = [] for device_name in template_machines: check_result = self.check(device_name, lab) - self.logger.info(check_result) results.append(check_result) - return results \ No newline at end of file + return results diff --git a/src/kathara_lab_checker/checks/IPv6EnabledCheck.py b/src/lab_checker/checks/IPv6EnabledCheck.py similarity index 84% rename from src/kathara_lab_checker/checks/IPv6EnabledCheck.py rename to src/lab_checker/checks/IPv6EnabledCheck.py index df5c04a..5b5a483 100644 --- a/src/kathara_lab_checker/checks/IPv6EnabledCheck.py +++ b/src/lab_checker/checks/IPv6EnabledCheck.py @@ -1,5 +1,4 @@ -from Kathara.exceptions import LinkNotFoundError, MachineNotFoundError -from Kathara.model import Link +from Kathara.exceptions import MachineNotFoundError from Kathara.model.Lab import Lab from .AbstractCheck import AbstractCheck @@ -23,6 +22,5 @@ def run(self, ipv6_devices: list[str], lab: Lab) -> list[CheckResult]: results = [] for device_name in ipv6_devices: check_result = self.check(device_name, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/InterfaceIPCheck.py b/src/lab_checker/checks/InterfaceIPCheck.py similarity index 79% rename from src/kathara_lab_checker/checks/InterfaceIPCheck.py rename to src/lab_checker/checks/InterfaceIPCheck.py index 72b4dca..aaa0ffb 100644 --- a/src/kathara_lab_checker/checks/InterfaceIPCheck.py +++ b/src/lab_checker/checks/InterfaceIPCheck.py @@ -3,7 +3,7 @@ from Kathara.exceptions import MachineNotRunningError from Kathara.model.Lab import Lab -from kathara_lab_checker.utils import get_interfaces_addresses +from lab_checker.utils import get_interfaces_addresses from .AbstractCheck import AbstractCheck from .CheckResult import CheckResult @@ -11,14 +11,13 @@ class InterfaceIPCheck(AbstractCheck): def check(self, device_name: str, interface_number: int, ip: str, dumped_iface: dict) -> CheckResult: - interface_name = f"eth{interface_number}" + interface_name = f"eth{interface_number}" if interface_number.isnumeric() else interface_number self.description = f"Verifying the IP address ({ip}) assigned to {interface_name} of {device_name}" try: iface_info = next(filter(lambda x: x["ifname"] == f"{interface_name}", dumped_iface)) except StopIteration: - return CheckResult(self.description, False, - f"Interface `{interface_name}` not found on `{device_name}`") + return CheckResult(self.description, False, f"Interface `{interface_name}` not found on `{device_name}`") ip_address = ipaddress.ip_interface(ip) @@ -35,8 +34,10 @@ def check(self, device_name: str, interface_number: int, ip: str, dumped_iface: reason = f"The IP address has a wrong netmask ({prefix_len})" return CheckResult(self.description, False, reason) - reason = (f"The interface `{iface_info['ifname']}` of `{device_name}` " - f"has the following IP addresses: {assigned_ips}`.") + reason = ( + f"The interface `{iface_info['ifname']}` of `{device_name}` " + f"has the following IP addresses: {assigned_ips}`." + ) return CheckResult(self.description, False, reason) def run(self, ip_mapping: dict, lab: Lab) -> list[CheckResult]: @@ -47,7 +48,6 @@ def run(self, ip_mapping: dict, lab: Lab) -> list[CheckResult]: dumped_iface = get_interfaces_addresses(device_name, lab) for interface_number, ip in iface_to_ips.items(): check_result = self.check(device_name, interface_number, ip, dumped_iface) - self.logger.info(check_result) results.append(check_result) except MachineNotRunningError: self.logger.warning(f"`{device_name}` is not running. Skipping checks...") diff --git a/src/lab_checker/checks/KernelRouteCheck.py b/src/lab_checker/checks/KernelRouteCheck.py new file mode 100644 index 0000000..0b5b052 --- /dev/null +++ b/src/lab_checker/checks/KernelRouteCheck.py @@ -0,0 +1,159 @@ +import ipaddress +import json +from typing import Union, Any + +from Kathara.exceptions import MachineNotRunningError +from Kathara.manager.Kathara import Kathara +from Kathara.model.Lab import Lab + +from .AbstractCheck import AbstractCheck +from .CheckResult import CheckResult + + +def load_routes_from_expected(expected_routes: list) -> dict[str, set]: + routes = {} + for route in expected_routes: + if type(route) is list: + routes[route[0]] = set(route[1]) + else: + routes[route] = set() + return routes + + +def get_kernel_routes(device_name: str, lab: Lab) -> list[dict[str, Any]]: + kathara_manager = Kathara.get_instance() + try: + stdout, _, _ = kathara_manager.exec(machine_name=device_name, lab_hash=lab.hash, command="ip -j route", + stream=False) + stdout = stdout.decode("utf-8").strip() + except MachineNotRunningError: + return [] + return json.loads(stdout) + + +def get_nexthops(device_name: str, lab: Lab) -> list[dict[str, Any]]: + kathara_manager = Kathara.get_instance() + + try: + stdout, _, _ = kathara_manager.exec(machine_name=device_name, lab_hash=lab.hash, command="ip -j nexthop", + stream=False) + stdout = stdout.decode("utf-8").strip() + except MachineNotRunningError: + return [] + + return json.loads(stdout) + + +def load_routes_from_device(device_name: str, lab: Lab) -> dict[str, set]: + ip_route_output = get_kernel_routes(device_name, lab) + routes = {} + kernel_nexthops = None + + for route in ip_route_output: + + dst = route["dst"] + if dst == "default": + dst = "0.0.0.0/0" + nexthops = None + if "scope" in route and route["scope"] == "link": + nexthops = [("d.c.", route["dev"])] + elif "nexthops" in route: + nexthops = list(map(lambda x: x["dev"], route["nexthops"])) + elif "gateway" in route: + nexthops = [(route["gateway"], route["dev"])] + elif "via" in route: + nexthops = [(route["via"]["host"], route["dev"])] + elif "nhid" in route: + # Lazy load nexthops + kernel_nexthops = get_nexthops(device_name, lab) if kernel_nexthops is None else kernel_nexthops + + current_nexthop = [obj for obj in kernel_nexthops if obj["id"] == route["nhid"]][0] + if "gateway" in current_nexthop: + nexthops = [(current_nexthop["gateway"], current_nexthop["dev"])] + elif "group" in current_nexthop: + nexthops = [ + (obj["gateway"], obj["dev"]) + for obj in kernel_nexthops + if obj["id"] in (nhid["id"] for nhid in current_nexthop["group"]) + ] + else: + raise Exception("Strange nexthop: ", current_nexthop) + routes[dst] = set(nexthops) + return routes + + +def is_valid_ip(ip_str): + try: + ipaddress.ip_address(ip_str) + return True + except ValueError: + return False + + +class KernelRouteCheck(AbstractCheck): + def check(self, device_name: str, expected_routing_table: list, lab: Lab) -> list[CheckResult]: + self.description = f"Checking the routing table of {device_name}" + actual_routing_table = dict( + filter( + lambda item: not any("d.c." in elem for elem in item[1]), + load_routes_from_device(device_name, lab).items(), + ) + ) + expected_routing_table = load_routes_from_expected(expected_routing_table) + + results = [] + + if len(expected_routing_table) != len(actual_routing_table): + check_result = CheckResult( + self.description, + False, + f"The routing table of {device_name} have the wrong number of routes: {len(actual_routing_table)}, expected: {len(expected_routing_table)}", + ) + results.append(check_result) + return results + + for dst, nexthops in expected_routing_table.items(): + if not dst in actual_routing_table: + check_result = CheckResult( + self.description, False, f"The routing table of {device_name} is missing route {dst}" + ) + results.append(check_result) + continue + if nexthops: + actual_nh = actual_routing_table[dst] + if len(nexthops) != len(actual_nh): + check_result = CheckResult( + self.description, + False, + f"The routing table of {device_name} about route {dst} have the wrong number of next-hops: {len(actual_nh)}, expected: {len(nexthops)}", + ) + results.append(check_result) + continue + for nh in nexthops: + valid_ip = is_valid_ip(nh) + if (valid_ip and not any(item[0] == nh for item in actual_nh)) or ( + not valid_ip and not any(item[1] == nh for item in actual_nh) + ): + check_result = CheckResult( + self.description, + False, + f"The routing table of {device_name} about route {dst} does not contain next-hop: {nh}, actual: {actual_nh}", + ) + results.append(check_result) + + if not results: + check_result = CheckResult(self.description, True, f"OK") + results.append(check_result) + + return results + + def run(self, devices_to_routes: dict[str, list[Union[str, list[str]]]], lab: Lab) -> list[CheckResult]: + results = [] + for device_name, expected_routes in devices_to_routes.items(): + self.logger.info(f"Checking kernel routes for `{device_name}`...") + try: + check_result = self.check(device_name, expected_routes, lab) + results = results + check_result + except MachineNotRunningError: + self.logger.warning(f"`{device_name}` is not running. Skipping checks...") + return results diff --git a/src/kathara_lab_checker/checks/ReachabilityCheck.py b/src/lab_checker/checks/ReachabilityCheck.py similarity index 83% rename from src/kathara_lab_checker/checks/ReachabilityCheck.py rename to src/lab_checker/checks/ReachabilityCheck.py index 2dc1d39..212b688 100644 --- a/src/kathara_lab_checker/checks/ReachabilityCheck.py +++ b/src/lab_checker/checks/ReachabilityCheck.py @@ -2,7 +2,7 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.utils import get_output +from lab_checker.utils import get_output from .AbstractCheck import AbstractCheck from .CheckResult import CheckResult @@ -32,12 +32,14 @@ def check(self, device_name: str, destination: str, lab: Lab) -> CheckResult: output = get_output(exec_output_gen).replace("ERROR: ", "") try: - parsed_output = jc.parse("ping", output) - if int(parsed_output['packets_received']) > 0: + parsed_output = jc.parse("ping", output, quiet=True) + if int(parsed_output["packets_received"]) > 0: reason = f"`{device_name}` can reach `{destination}`." if invert else "OK" return CheckResult(self.description, invert ^ True, reason) else: - reason = "OK" if invert else f"`{device_name}` does not receive any answer from `{destination}`." + reason = ( + "OK" if invert else f"`{device_name}` does not receive any answer from `{destination}`." + ) return CheckResult(self.description, invert ^ False, reason) except Exception: return CheckResult(self.description, False, output.strip()) @@ -47,6 +49,5 @@ def run(self, devices_to_destinations: dict[str, list[str]], lab: Lab) -> list[C for device_name, destinations in devices_to_destinations.items(): for destination in destinations: check_result = self.check(device_name, destination, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/StartupExistenceCheck.py b/src/lab_checker/checks/StartupExistenceCheck.py similarity index 92% rename from src/kathara_lab_checker/checks/StartupExistenceCheck.py rename to src/lab_checker/checks/StartupExistenceCheck.py index 7467408..b7b632b 100644 --- a/src/kathara_lab_checker/checks/StartupExistenceCheck.py +++ b/src/lab_checker/checks/StartupExistenceCheck.py @@ -18,6 +18,5 @@ def run(self, machines_to_check: list[str], lab: Lab) -> list[CheckResult]: results = [] for device_name in machines_to_check: check_result = self.check(device_name, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/SysctlCheck.py b/src/lab_checker/checks/SysctlCheck.py similarity index 91% rename from src/kathara_lab_checker/checks/SysctlCheck.py rename to src/lab_checker/checks/SysctlCheck.py index be2a3be..aa0ba42 100644 --- a/src/kathara_lab_checker/checks/SysctlCheck.py +++ b/src/lab_checker/checks/SysctlCheck.py @@ -1,5 +1,4 @@ -from Kathara.exceptions import LinkNotFoundError, MachineNotFoundError -from Kathara.model import Link +from Kathara.exceptions import MachineNotFoundError from Kathara.model.Lab import Lab from .AbstractCheck import AbstractCheck @@ -34,6 +33,5 @@ def run(self, devices_sysctls: dict[str, list[str]], lab: Lab) -> list[CheckResu for device_name, sysctls in devices_sysctls.items(): for sysctl in sysctls: check_result = self.check(device_name, sysctl, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/__init__.py b/src/lab_checker/checks/__init__.py similarity index 100% rename from src/kathara_lab_checker/checks/__init__.py rename to src/lab_checker/checks/__init__.py diff --git a/src/kathara_lab_checker/checks/applications/__init__.py b/src/lab_checker/checks/applications/__init__.py similarity index 100% rename from src/kathara_lab_checker/checks/applications/__init__.py rename to src/lab_checker/checks/applications/__init__.py diff --git a/src/kathara_lab_checker/checks/applications/dns/DNSAuthorityCheck.py b/src/lab_checker/checks/applications/dns/DNSAuthorityCheck.py similarity index 85% rename from src/kathara_lab_checker/checks/applications/dns/DNSAuthorityCheck.py rename to src/lab_checker/checks/applications/dns/DNSAuthorityCheck.py index 203a0dd..b58780c 100644 --- a/src/kathara_lab_checker/checks/applications/dns/DNSAuthorityCheck.py +++ b/src/lab_checker/checks/applications/dns/DNSAuthorityCheck.py @@ -5,14 +5,14 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output, find_lines_with_string, find_device_name_from_ip +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output, find_lines_with_string, find_device_name_from_ip class DNSAuthorityCheck(AbstractCheck): def check( - self, domain: str, authority_ip: str, device_name: str, device_ip: str, lab: Lab + self, domain: str, authority_ip: str, device_name: str, device_ip: str, lab: Lab ) -> CheckResult: self.description = ( f"Checking on `{device_name}` that `{authority_ip}` is the authority for domain `{domain}`" @@ -80,18 +80,17 @@ def check( return CheckResult(self.description, False, reason) def run( - self, - zone_to_authoritative_ips: dict[str, list[str]], - local_nameservers: list[str], - ip_mapping: dict[str, dict[str, str]], - lab: Lab, + self, + zone_to_authoritative_ips: dict[str, list[str]], + local_nameservers: list[str], + ip_mapping: dict[str, dict[str, str]], + lab: Lab, ) -> list[CheckResult]: results = [] for domain, name_servers in zone_to_authoritative_ips.items(): self.logger.info(f"Checking authority ip for domain `{domain}`") for ns in name_servers: check_result = self.check(domain, ns, find_device_name_from_ip(ip_mapping, ns), ns, lab) - self.logger.info(check_result) results.append(check_result) if domain == ".": @@ -106,13 +105,11 @@ def run( generic_ns_ip, lab, ) - self.logger.info(check_result) results.append(check_result) for local_ns in local_nameservers: check_result = self.check( domain, ns, find_device_name_from_ip(ip_mapping, local_ns), local_ns, lab ) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/lab_checker/checks/applications/dns/DNSRecordCheck.py b/src/lab_checker/checks/applications/dns/DNSRecordCheck.py new file mode 100644 index 0000000..20b4db2 --- /dev/null +++ b/src/lab_checker/checks/applications/dns/DNSRecordCheck.py @@ -0,0 +1,39 @@ +from Kathara.manager.Kathara import Kathara +from Kathara.model.Lab import Lab + +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output + + +class DNSRecordCheck(AbstractCheck): + + def run( + self, + records: dict[str, dict[str, list[str]]], + machines_with_dns: list[str], + lab: Lab, + ) -> list[CheckResult]: + results = [] + kathara_manager: Kathara = Kathara.get_instance() + + for recordtype, recordvalue in records.items(): + for record, addresses in recordvalue.items(): + for client in machines_with_dns: + exec_output_gen = kathara_manager.exec( + machine_name=client, + command=f"dig +short {recordtype} {record}", + lab_hash=lab.hash, + ) + ip = get_output(exec_output_gen).strip() + if ip in addresses: + check_result = CheckResult("Checking correctness of DNS records", True, "OK") + else: + check_result = CheckResult( + "Checking correctness of DNS records", + False, + f"{client} resolve {recordtype} {record} with IP {ip} instead of {addresses}", + ) + check_result + results.append(check_result) + return results diff --git a/src/kathara_lab_checker/checks/applications/dns/LocalNSCheck.py b/src/lab_checker/checks/applications/dns/LocalNSCheck.py similarity index 84% rename from src/kathara_lab_checker/checks/applications/dns/LocalNSCheck.py rename to src/lab_checker/checks/applications/dns/LocalNSCheck.py index 7e4f5c6..ebc47ec 100644 --- a/src/kathara_lab_checker/checks/applications/dns/LocalNSCheck.py +++ b/src/lab_checker/checks/applications/dns/LocalNSCheck.py @@ -3,9 +3,9 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output class LocalNSCheck(AbstractCheck): @@ -38,6 +38,5 @@ def run(self, local_nameservers_to_devices: dict[str, list[str]], lab: Lab) -> l for local_ns, managed_devices in local_nameservers_to_devices.items(): for device_name in managed_devices: check_result = self.check(local_ns, device_name, lab) - self.logger.info(check_result) results.append(check_result) - return results \ No newline at end of file + return results diff --git a/src/kathara_lab_checker/checks/applications/dns/__init__.py b/src/lab_checker/checks/applications/dns/__init__.py similarity index 100% rename from src/kathara_lab_checker/checks/applications/dns/__init__.py rename to src/lab_checker/checks/applications/dns/__init__.py diff --git a/src/kathara_lab_checker/checks/protocols/AnnouncedNetworkCheck.py b/src/lab_checker/checks/protocols/AnnouncedNetworkCheck.py similarity index 84% rename from src/kathara_lab_checker/checks/protocols/AnnouncedNetworkCheck.py rename to src/lab_checker/checks/protocols/AnnouncedNetworkCheck.py index e665cbf..e6a1486 100644 --- a/src/kathara_lab_checker/checks/protocols/AnnouncedNetworkCheck.py +++ b/src/lab_checker/checks/protocols/AnnouncedNetworkCheck.py @@ -3,9 +3,9 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output class AnnouncedNetworkCheck(AbstractCheck): @@ -34,6 +34,5 @@ def run(self, protocol: str, devices_to_networks: dict[str, list[str]], lab: Lab self.logger.info(f"Checking {device_name} BGP announces...") for network in networks: check_result = self.check(device_name, protocol, network, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/protocols/ProtocolRedistributionCheck.py b/src/lab_checker/checks/protocols/ProtocolRedistributionCheck.py similarity index 87% rename from src/kathara_lab_checker/checks/protocols/ProtocolRedistributionCheck.py rename to src/lab_checker/checks/protocols/ProtocolRedistributionCheck.py index 6e4e8fd..a4857ff 100644 --- a/src/kathara_lab_checker/checks/protocols/ProtocolRedistributionCheck.py +++ b/src/lab_checker/checks/protocols/ProtocolRedistributionCheck.py @@ -3,9 +3,9 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output class ProtocolRedistributionCheck(AbstractCheck): @@ -47,6 +47,5 @@ def run(self, protocol, devices_to_redistributed: dict[str, list[str]], lab: Lab for device_name, injected_protocols in devices_to_redistributed.items(): for injected_protocol in injected_protocols: check_result = self.check(device_name, protocol, injected_protocol, lab) - self.logger.info(check_result) results.append(check_result) - return results \ No newline at end of file + return results diff --git a/src/kathara_lab_checker/checks/protocols/__init__.py b/src/lab_checker/checks/protocols/__init__.py similarity index 100% rename from src/kathara_lab_checker/checks/protocols/__init__.py rename to src/lab_checker/checks/protocols/__init__.py diff --git a/src/kathara_lab_checker/checks/protocols/bgp/BGPPeeringCheck.py b/src/lab_checker/checks/protocols/bgp/BGPPeeringCheck.py similarity index 88% rename from src/kathara_lab_checker/checks/protocols/bgp/BGPPeeringCheck.py rename to src/lab_checker/checks/protocols/bgp/BGPPeeringCheck.py index 525126c..eabeb5c 100644 --- a/src/kathara_lab_checker/checks/protocols/bgp/BGPPeeringCheck.py +++ b/src/lab_checker/checks/protocols/bgp/BGPPeeringCheck.py @@ -4,9 +4,9 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output class BGPPeeringCheck(AbstractCheck): @@ -47,6 +47,5 @@ def run(self, device_to_neighbours: dict[str, list[str]], lab: Lab) -> list[Chec for neighbor in neighbors: self.description = f"{device_name} has bgp peer {neighbor}" check_result = self.check(device_name, neighbor, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/protocols/bgp/__init__.py b/src/lab_checker/checks/protocols/bgp/__init__.py similarity index 100% rename from src/kathara_lab_checker/checks/protocols/bgp/__init__.py rename to src/lab_checker/checks/protocols/bgp/__init__.py diff --git a/src/kathara_lab_checker/checks/protocols/evpn/AnnouncedVNICheck.py b/src/lab_checker/checks/protocols/evpn/AnnouncedVNICheck.py similarity index 83% rename from src/kathara_lab_checker/checks/protocols/evpn/AnnouncedVNICheck.py rename to src/lab_checker/checks/protocols/evpn/AnnouncedVNICheck.py index 895b3f3..5413258 100644 --- a/src/kathara_lab_checker/checks/protocols/evpn/AnnouncedVNICheck.py +++ b/src/lab_checker/checks/protocols/evpn/AnnouncedVNICheck.py @@ -3,9 +3,9 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output class AnnouncedVNICheck(AbstractCheck): @@ -42,14 +42,12 @@ def check(self, device_name: str, lab: Lab, invert: bool = False) -> CheckResult def run(self, device_to_vnis_info: dict[str, dict], evpn_devices: list[str], lab: Lab) -> list[CheckResult]: results = [] for device_name in device_to_vnis_info.keys(): - check_result = self.check(device_name, lab) - self.logger.info(check_result) + check_result = self.check(device_name, lab) results.append(check_result) not_advertise = set(evpn_devices).difference(set(device_to_vnis_info.keys())) for device_name in not_advertise: check_result = self.check(device_name, lab, invert=True) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/protocols/evpn/EVPNSessionCheck.py b/src/lab_checker/checks/protocols/evpn/EVPNSessionCheck.py similarity index 91% rename from src/kathara_lab_checker/checks/protocols/evpn/EVPNSessionCheck.py rename to src/lab_checker/checks/protocols/evpn/EVPNSessionCheck.py index 4686ed0..e16efae 100644 --- a/src/kathara_lab_checker/checks/protocols/evpn/EVPNSessionCheck.py +++ b/src/lab_checker/checks/protocols/evpn/EVPNSessionCheck.py @@ -4,9 +4,9 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output class EVPNSessionCheck(AbstractCheck): @@ -54,6 +54,5 @@ def run(self, device_to_neighbours: dict[str, list[str]], lab: Lab) -> list[Chec for neighbor in neighbors: self.description = f"{device_name} has bgp peer {neighbor}" check_result = self.check(device_name, neighbor, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/protocols/evpn/VTEPCheck.py b/src/lab_checker/checks/protocols/evpn/VTEPCheck.py similarity index 90% rename from src/kathara_lab_checker/checks/protocols/evpn/VTEPCheck.py rename to src/lab_checker/checks/protocols/evpn/VTEPCheck.py index 310cb35..7090c40 100644 --- a/src/kathara_lab_checker/checks/protocols/evpn/VTEPCheck.py +++ b/src/lab_checker/checks/protocols/evpn/VTEPCheck.py @@ -4,9 +4,9 @@ from Kathara.manager.Kathara import Kathara from Kathara.model.Lab import Lab -from kathara_lab_checker.checks.AbstractCheck import AbstractCheck -from kathara_lab_checker.checks.CheckResult import CheckResult -from kathara_lab_checker.utils import get_output +from lab_checker.checks.AbstractCheck import AbstractCheck +from lab_checker.checks.CheckResult import CheckResult +from lab_checker.utils import get_output class VTEPCheck(AbstractCheck): @@ -44,6 +44,5 @@ def run(self, device_to_vnis_info: dict[str, dict], lab: Lab) -> list[CheckResul for vni in vnis: self.description = f"Checking that `{device_name}` VTEP has vni `{vni}` with VTEP IP `{vtep_ip}`" check_result = self.check(device_name, vni, vtep_ip, lab) - self.logger.info(check_result) results.append(check_result) return results diff --git a/src/kathara_lab_checker/checks/protocols/evpn/__init__.py b/src/lab_checker/checks/protocols/evpn/__init__.py similarity index 100% rename from src/kathara_lab_checker/checks/protocols/evpn/__init__.py rename to src/lab_checker/checks/protocols/evpn/__init__.py diff --git a/src/lab_checker/excel_utils.py b/src/lab_checker/excel_utils.py new file mode 100644 index 0000000..8bebc24 --- /dev/null +++ b/src/lab_checker/excel_utils.py @@ -0,0 +1,93 @@ +import os + +from openpyxl import Workbook +from openpyxl.styles import Alignment +from openpyxl.worksheet.worksheet import Worksheet + + +def write_final_results_to_excel(test_collector: "TestCollectorPackage.TestCollector", path: str): + # Create a new Excel workbook + workbook = Workbook() + + # Select the active sheet + sheet = workbook.active + + sheet["A1"] = "Student Name" + sheet["B1"] = "Tests Passed" + sheet["C1"] = "Tests Failed" + sheet["D1"] = "Tests Total Number" + sheet["E1"] = "Problems" + + for index, (test_name, test_results) in enumerate(test_collector.tests.items()): + failed_tests = test_collector.get_failed(test_name) + passed_tests = test_collector.get_passed(test_name) + sheet["A" + str(index + 2)] = test_name + sheet["B" + str(index + 2)] = len(passed_tests) + sheet["C" + str(index + 2)] = len(failed_tests) + sheet["D" + str(index + 2)] = len(test_results) + + if failed_tests: + failed_string = "" + for idx, failed in enumerate(failed_tests): + failed_string += f"{(idx + 1)}: {failed.reason}\n" + if len(failed_string) >= 32767: + raise Exception("ERROR: Excel cell too big") + sheet["E" + str(index + 2)] = failed_string + sheet["E" + str(index + 2)].alignment = Alignment(wrapText=True) + else: + sheet["E" + str(index + 2)] = "None" + + excel_file = os.path.join(path, "results.xlsx") + workbook.save(excel_file) + + +def _write_sheet_row(sheet: Worksheet, column: int, description: str, passed: str, reason: str) -> None: + sheet["A" + str(column + 2)] = description + sheet["B" + str(column + 2)] = passed + sheet["C" + str(column + 2)] = reason + + +def write_result_to_excel(check_results: list["CheckResultPackage.CheckResult"], path: str): + # Create a new Excel workbook + workbook: Workbook = Workbook() + + workbook.create_sheet("Summary", 0) + sheet_summary = workbook.get_sheet_by_name("Summary") + sheet_summary["A1"] = "Total Tests" + sheet_summary["B1"] = "Passed Tests" + sheet_summary["C1"] = "Failed" + + _write_sheet_row( + sheet_summary, + 0, + str(len(check_results)), + str(len(list(filter(lambda x: x.passed, check_results)))), + str(len(list(filter(lambda x: not x.passed, check_results)))), + ) + + # Select the active sheet + workbook.create_sheet("All", 1) + sheet_all = workbook.get_sheet_by_name("All") + sheet_all["A1"] = "Tests Description" + sheet_all["B1"] = "Passed" + sheet_all["C1"] = "Reason" + + workbook.create_sheet("Failed", 2) + sheet_failed = workbook.get_sheet_by_name("Failed") + sheet_failed["A1"] = "Tests Description" + sheet_failed["B1"] = "Passed" + sheet_failed["C1"] = "Reason" + + failed_index = 0 + for index, check_result in enumerate(check_results): + if not check_result.passed: + _write_sheet_row( + sheet_failed, + failed_index, + check_result.description, + str(check_result.passed), + check_result.reason, + ) + failed_index += 1 + _write_sheet_row(sheet_all, index, check_result.description, str(check_result.passed), check_result.reason) + workbook.save(os.path.join(path, f"{os.path.basename(path)}_result.xlsx")) diff --git a/src/lab_checker/utils.py b/src/lab_checker/utils.py new file mode 100644 index 0000000..7105357 --- /dev/null +++ b/src/lab_checker/utils.py @@ -0,0 +1,80 @@ +import json +from typing import Optional + +from Kathara.manager.Kathara import Kathara +from Kathara.model.Lab import Lab + + +def red(s): + return f"\033[91m {s}\033[00m" + + +def green(s): + return f"\033[92m {s}\033[00m" + + +def yellow(s): + return f"\033[93m {s}\033[00m" + + +def get_output(exec_output): + output = "" + try: + while True: + (stdout, stderr) = next(exec_output) + stdout = stdout.decode("utf-8") if stdout else "" + stderr = stderr.decode("utf-8") if stderr else "" + + if stdout: + output += stdout + if stderr: + output += f"ERROR: {stderr}" + except StopIteration: + pass + return output + + +def get_interfaces_addresses(device_name: str, lab: Lab) -> dict: + kathara_manager = Kathara.get_instance() + + exec_output_gen = kathara_manager.exec( + machine_name=device_name, + command=f"ip -j address", + lab_hash=lab.hash, + ) + + return json.loads(get_output(exec_output_gen)) + + +def find_device_name_from_ip(ip_mapping: dict[str, dict[str, str]], ip_search: str) -> Optional[str]: + for device, ip_addresses in ip_mapping.items(): + for _, ip in ip_addresses.items(): + # Check if the base IP matches (ignoring the CIDR notation) + if ip.split("/")[0] == ip_search: + return device + raise Exception("Something is missing/wrong in the ip_mapping configuration!") + + +def find_lines_with_string(file_content, search_string): + """ + Returns lines from the provided multi-line string that contain the search string. + + :param file_content: A string representing the content of a file (multi-line string). + :param search_string: A string to search for in each line of the file content. + :return: A list of lines that contain the search string. + """ + # Splitting the string into lines + lines = file_content.split("\n") + + # Filtering lines that contain the search string + matching_lines = [line for line in lines if search_string in line] + + return matching_lines + + +def reverse_dictionary(dictionary: dict): + reversed_dict = {} + for k, values in dictionary.items(): + for v in values: + reversed_dict[v] = reversed_dict.get(v, []) + [k] + return reversed_dict