From 65aac1c851812318ad16a84ccac1a74b1181bac8 Mon Sep 17 00:00:00 2001 From: ms264556 <29752086+ms264556@users.noreply.github.com> Date: Sat, 27 Apr 2024 04:41:35 +0100 Subject: [PATCH] Add do_set_acl_members method --- aioruckus/ajaxsession.py | 4 +- aioruckus/const.py | 3 + aioruckus/nativeajaxapi.py | 621 ------------------------- aioruckus/ruckusajaxapi.py | 840 ++++++++++++++++++++++++++-------- aioruckus/ruckusapi.py | 5 + aioruckus/smartzoneajaxapi.py | 82 ++-- 6 files changed, 707 insertions(+), 848 deletions(-) delete mode 100644 aioruckus/nativeajaxapi.py diff --git a/aioruckus/ajaxsession.py b/aioruckus/ajaxsession.py index fe3417f..488223e 100644 --- a/aioruckus/ajaxsession.py +++ b/aioruckus/ajaxsession.py @@ -132,8 +132,8 @@ async def login(self) -> None: raise ConnectionRefusedError(ERROR_CONNECT_TEMPORARY) # pylint: disable=import-outside-toplevel - from .nativeajaxapi import NativeAjaxApi - self._api = NativeAjaxApi(self) + from .ruckusajaxapi import RuckusAjaxApi + self._api = RuckusAjaxApi(self) return self async def sz_login(self) -> None: diff --git a/aioruckus/const.py b/aioruckus/const.py index 4d5241d..83c4c00 100644 --- a/aioruckus/const.py +++ b/aioruckus/const.py @@ -18,6 +18,9 @@ ERROR_PASSPHRASE_MISSING = "WPA2 and Mixed WPA2/3 WLANs require a passphrase" ERROR_SAEPASSPHRASE_MISSING = "WPA3 and Mixed WPA2/3 WLANs require an SAE passphrase" ERROR_PASSPHRASE_NAME = "You must also provide a name if you wish to override the passphrase" +ERROR_ACL_NOT_FOUND = "ACL not found" +ERROR_ACL_TOO_BIG = "ACLs may only contain 128 stations" +ERROR_ACL_SYSTEM = "Please use do_block_client() and do_unblock_client() to modify the System ACL" class SystemStat(Enum): """Ruckus System Info section keys""" diff --git a/aioruckus/nativeajaxapi.py b/aioruckus/nativeajaxapi.py deleted file mode 100644 index 586ec8e..0000000 --- a/aioruckus/nativeajaxapi.py +++ /dev/null @@ -1,621 +0,0 @@ -"""Adds Unleashed / ZoneDirector AJAX Statistics and Command methods to RuckusApi""" - -from collections.abc import AsyncIterator -import datetime -import random -from re import IGNORECASE, match -from typing import Any, Dict, List -import xml.etree.ElementTree as ET -from xml.sax import saxutils -import xmltodict - -from aioruckus.ruckusajaxapi import RuckusAjaxApi - -from .const import ( - ERROR_INVALID_MAC, - ERROR_PASSPHRASE_LEN, - ERROR_PASSPHRASE_JS, - ERROR_PASSPHRASE_MISSING, - ERROR_SAEPASSPHRASE_MISSING, - ERROR_INVALID_WLAN, - ERROR_PASSPHRASE_NAME, - PatchNewAttributeMode, - SystemStat, - WlanEncryption -) -from .abcsession import ConfigItem -from .ajaxsession import AjaxSession - -class NativeAjaxApi(RuckusAjaxApi): - """Ruckus ZoneDirector or Unleashed Configuration, Statistics and Commands API""" - def __init__(self, session: AjaxSession): - super().__init__(session) - - async def get_system_info(self, *sections: SystemStat) -> dict: - sections = ( - [s for section_list in sections for s in section_list.value] - if sections else SystemStat.DEFAULT.value - ) - section = ''.join(f"<{s}/>" for s in sections) - sysinfo = await self.cmdstat( - # language=XML - f"{section}" - ) - return sysinfo.get("response", sysinfo.get("system")) - - async def get_active_clients(self, interval_stats: bool = False) -> List: - """Return a list of active clients""" - if interval_stats: - endtime = await self._get_timestamp_at_controller() - starttime = endtime - 86400 - clientrequest = f"" - else: - clientrequest = "" - return await self.cmdstat(f"{clientrequest}", ["client"]) - - async def get_inactive_clients(self) -> List: - """Return a list of inactive clients""" - return await self.cmdstat("", ["client"]) - - async def get_ap_stats(self) -> List: - """Return a list of AP statistics""" - return await self.cmdstat( - "" - "", ["ap"] - ) - - async def get_ap_group_stats(self) -> List: - """Return a list of AP group statistics""" - return await self.cmdstat( - "" - "", ["group", "radio", "ap"] - ) - - async def get_vap_stats(self) -> List: - """Return a list of Virtual AP (per-radio WLAN) statistics""" - return await self.cmdstat( - "" - "", ["vap"] - ) - - async def get_wlan_group_stats(self) -> List: - """Return a list of WLAN group statistics""" - return await self.cmdstat( - "" - "", ["wlangroup", "wlan"] - ) - - async def get_dpsk_stats(self) -> List: - """Return a list of AP group statistics""" - return await self.cmdstat( - "" - "", ["dpsk"] - ) - - async def get_active_rogues(self) -> list[dict]: - """Return a list of currently active rogue devices""" - return await self.cmdstat( - "" - "", ["rogue"] - ) - - async def get_known_rogues(self, limit: int = 300) -> list[dict]: - """Return a list of known/recognized rogues devices""" - return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "recognized": "true"}, updater="krogue", limit=limit)] - - async def get_blocked_rogues(self, limit: int = 300) -> list[dict]: - """Return a list of user blocked rogues devices""" - return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "blocked": "true"}, updater="brogue", limit=limit)] - - async def get_all_alarms(self, limit: int = 300) -> list[dict]: - """Return a list of all alerts""" - return [alarm async for alarm in self.cmdstat_piecewise("eventd", "alarm", updater="page", limit=limit)] - - async def get_all_events(self, limit: int = 300) -> list[dict]: - """Return a list of all events""" - return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", limit=limit)] - - async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]: - """Return a list of WLAN events""" - return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"wlan": list(wlan_ids) if wlan_ids else "*"}, limit=limit)] - - async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]: - """Return a list of AP events""" - return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"ap": list(self._normalize_mac(mac) for mac in ap_macs) if ap_macs else "*"}, limit=limit)] - - async def get_client_events(self, limit: int = 300) -> list[dict]: - """Return a list of client events""" - return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "user"}, limit=limit)] - - async def get_wired_client_events(self, limit: int = 300) -> list[dict]: - """Return a list of wired client events""" - return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "wire"}, limit=limit)] - - async def get_syslog(self) -> str: - """Return a list of syslog entries""" - ts = self._ruckus_timestamp() - syslog = await self.cmdstat( - f"" - f"" - ) - return syslog["xmsg"]["res"] - - async def get_backup(self) -> bytes: - """Return a backup""" - request = self.session.base_url + "/_savebackup.jsp?time=" + self._ruckus_backup_timestamp() - return await self.session.request_file(request, 60) - - async def do_block_client(self, mac: str) -> None: - """Block a client""" - mac = self._normalize_mac(mac) - await self.cmdstat( - f"" - f"" - f"" - ) - - async def do_unblock_client(self, mac: str) -> None: - """Unblock a client""" - mac = self._normalize_mac(mac) - blocked = await self.get_blocked_client_macs() - remaining = ''.join(( - f"" for deny in blocked - if deny["mac"] != mac - )) - await self._do_conf( - f"" - f"" - f"{remaining}" - ) - - async def do_delete_ap_group(self, name: str) -> bool: - """Delete an AP group""" - ap_group = await self._find_ap_group_by_name(name) - if ap_group is None: - return False - ts = self._ruckus_timestamp() - await self._do_conf( - f"" - f"" - ) - return True - - async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None: - """Disable a WLAN""" - wlan = await self._find_wlan_by_name(name) - if wlan: - ts = self._ruckus_timestamp() - await self._do_conf( - f"" - f"" - ) - - async def do_enable_wlan(self, name: str) -> None: - """Enable a WLAN""" - await self.do_disable_wlan(name, False) - - async def do_set_wlan_password( - self, - name: str, - passphrase: str, - sae_passphrase: str = None - ) -> None: - """Set a WLAN password""" - sae_passphrase = sae_passphrase or passphrase - await self.do_edit_wlan( - name, {"wpa": {"passphrase": passphrase, "sae-passphrase": sae_passphrase}}, True - ) - - async def do_add_wlan( - self, - name: str, - encryption: WlanEncryption = WlanEncryption.WPA2, - passphrase: str = None, - sae_passphrase: str = None, - ssid_override: str = None, - ignore_unknown_attributes: bool = False - ) -> None: - """Add a WLAN""" - patch = {"name": name, "ssid": ssid_override or name, "encryption": encryption.value} - if passphrase is not None or sae_passphrase is not None: - patch_wpa = {} - patch["wpa"] = patch_wpa - if passphrase is not None: - patch_wpa["passphrase"] = passphrase - if sae_passphrase is not None: - patch_wpa["sae-passphrase"] = sae_passphrase - await self.do_clone_wlan(patch) - - async def do_clone_wlan( - self, template: dict, new_name: str = None, new_ssid: str = None - ) -> None: - """Clone a WLAN""" - wlansvc = await self._get_default_wlan_template() - self._normalize_encryption(wlansvc, template) - self._patch_template(wlansvc, template, True) - if new_name is not None or new_ssid is not None: - if new_name is None: - raise ValueError(ERROR_PASSPHRASE_NAME) - self._patch_template(wlansvc, {"name": new_name, "ssid": new_ssid or new_name }) - await self._add_wlan_template(wlansvc) - - async def do_edit_wlan( - self, name: str, patch: dict, patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR - ) -> None: - """Edit a WLAN""" - wlansvc = await self._get_wlan_template(name) - if wlansvc: - self._normalize_encryption(wlansvc, patch) - self._patch_template(wlansvc, patch, patch_new_attributes) - await self._update_wlan_template(wlansvc) - - async def do_delete_wlan(self, name: str) -> bool: - """Delete a WLAN""" - wlan = await self._find_wlan_by_name(name) - if wlan is None: - return False - ts = self._ruckus_timestamp() - await self._do_conf( - f"" - f"", timeout=20 - ) - return True - - async def do_add_wlan_group(self, name: str, description: str = "", wlans: List = None) -> None: - """Add a WLAN group""" - wlangroup = ET.Element("wlangroup", {"name": name, "description": description or ""}) - if wlans is not None: - wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()} - for wlansvc in wlans: - wlan_name = None - if isinstance(wlansvc, str): - if wlansvc in wlan_map: - wlan_name = wlansvc - elif isinstance(wlansvc, dict): - if "name" in wlansvc and wlansvc["name"] in wlan_map: - wlan_name = wlansvc["name"] - if wlan_name is None: - raise ValueError(ERROR_INVALID_WLAN) - ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlan_name]}) - await self._do_conf( - f"" - f"{ET.tostring(wlangroup).decode('utf-8')}" - ) - - async def do_clone_wlan_group(self, template: dict, name: str, description: str = None) -> None: - """Clone a WLAN group""" - wlangroup = ET.Element("wlangroup", { - "name": name, - "description": description or template.get("description", "") - }) - if "wlan" in template: - wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()} - for wlansvc in template["wlan"]: - ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlansvc["name"]]}) - await self._do_conf( - f"" - f"{ET.tostring(wlangroup).decode('utf-8')}" - ) - - async def do_delete_wlan_group(self, name: str) -> bool: - """Delete a WLAN group""" - wlang = await self._find_wlan_group_by_name(name) - if wlang is None: - return False - ts = self._ruckus_timestamp() - await self._do_conf( - f"" - f"" - ) - return True - - async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None: - """Hide AP LEDs""" - mac = self._normalize_mac(mac) - found_ap = await self._find_ap_by_mac(mac) - if found_ap: - ts = self._ruckus_timestamp() - await self._do_conf( - f"" - f"" - f"" - ) - - async def do_show_ap_leds(self, mac: str) -> None: - """Show AP LEDs""" - await self.do_hide_ap_leds(mac, False) - - async def do_restart_ap(self, mac: str) -> None: - """Restart AP""" - mac = self._normalize_mac(mac) - ts = self._ruckus_timestamp() - await self._cmdstat_noparse( - f"" - ) - - async def _get_default_apgroup_template(self) -> ET.Element: - """Get default AP group template""" - xml = await self.session.get_conf_str(ConfigItem.APGROUP_TEMPLATE) - root = ET.fromstring(xml) - return root.find(".//apgroup") - - async def _get_default_wlan_template(self) -> ET.Element: - """Get default WLAN template""" - xml = await self.session.get_conf_str(ConfigItem.WLANSVC_STANDARD_TEMPLATE) - root = ET.fromstring(xml) - wlansvc = root.find(".//wlansvc") - if wlansvc is not None: - return wlansvc - return self._get_default_cli_wlan_template() - - @staticmethod - def _get_default_cli_wlan_template() -> ET.Element: - wlansvc = ET.Element("wlansvc", { - "name": "default-standard-wlan", "ssid": "", "authentication": "open", - "encryption": "none", "is-guest": "false", "max-clients-per-radio": "100", - "do-802-11d": "disabled", "sta-info-extraction": "1", "force-dhcp": "0", - "force-dhcp-timeout": "10", "usage": "user", "policy-id": "", "policy6-id": "", - "precedence-id": "1", "devicepolicy-id": "", "role-based-access-ctrl": "false", - "acl-id": "1", "local-bridge": "1", "client-isolation": "disabled", - "ci-whitelist-id": "0", "bgscan": "1", "idle-timeout": "1", "max-idle-timeout": "300", - "dis-dgaf": "0", "authstats": "0", "https-redirection": "disabled" - }) - ET.SubElement(wlansvc, "qos", {"uplink-preset": "DISABLE", "downlink-preset": "DISABLE"}) - ET.SubElement(wlansvc, "queue-priority", { - "voice": "0", "video": "2", "data": "4", "background": "6" - }) - ET.SubElement(wlansvc, "wlan-schedule", { - "value": "0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:" - "0x0:0x0:0x0:0x0: 0x0:0x0:0x0:0x0:0x0:0x0" - }) - return wlansvc - - async def _get_wlan_template(self, name: str) -> ET.Element | None: - xml = await self.session.get_conf_str(ConfigItem.WLANSVC_LIST) - root = ET.fromstring(xml) - wlansvc = root.find(f".//wlansvc[@name='{saxutils.escape(name)}']") - return wlansvc - - def _normalize_encryption(self, wlansvc: ET.Element, patch: dict): - patch_wpa = patch["wpa"] if "wpa" in patch else None - if patch_wpa is not None: - if "passphrase" in patch_wpa: - self._validate_passphrase(patch_wpa["passphrase"]) - if "sae-passphrase" in patch_wpa: - self._validate_passphrase(patch_wpa["sae-passphrase"]) - - encryption = wlansvc.get("encryption") - if "encryption" in patch and patch["encryption"] != encryption: - new_encryption = patch["encryption"] - wlansvc.set("encryption", new_encryption) - - wpa = wlansvc.find("wpa") - new_wpa = {"cipher": "aes", "dynamic-psk": "disabled"} - - if new_encryption in (WlanEncryption.WPA2.value, WlanEncryption.WPA23_MIXED.value): - passphrase = wpa.get("passphrase") if wpa is not None else None - if not patch_wpa.get("passphrase") and passphrase is None: - raise ValueError(ERROR_PASSPHRASE_MISSING) - new_wpa["passphrase"] = passphrase or "" - if new_encryption in (WlanEncryption.WPA3.value, WlanEncryption.WPA23_MIXED.value): - sae_passphrase = wpa.get("sae_passphrase") if wpa is not None else None - if not patch_wpa.get("sae_passphrase") and sae_passphrase is None: - raise ValueError(ERROR_SAEPASSPHRASE_MISSING) - new_wpa["sae-passphrase"] = sae_passphrase or "" - - if wpa is not None: - wlansvc.remove(wpa) - if new_encryption != WlanEncryption.NONE.value: - wpa = ET.SubElement(wlansvc, "wpa", new_wpa) - - def _patch_template( - self, - element: ET.Element, - patch: dict, - patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR, - current_path: str = "" - ) -> None: - visited_children = set() - for child in element: - if child.tag in patch and isinstance(patch[child.tag], dict): - self._patch_template( - child, - patch[child.tag], - patch_new_attributes, - f"{current_path}/{child.tag}" - ) - visited_children.add(child.tag) - for name, value in patch.items(): - if name in visited_children: - pass - else: - current_value = element.get(name) - if isinstance(value, List): - raise ValueError(f"Applying lists is unsupported: {current_path}/{name}") - if current_value is None: - if patch_new_attributes == PatchNewAttributeMode.ERROR: - raise ValueError(f"Unknown attribute: {current_path}/{name}") - elif patch_new_attributes == PatchNewAttributeMode.IGNORE: - continue - else: - value = self._normalize_conf_value(current_value, value) - element.set(name, value) - x_name = f"x-{name}" - if x_name not in patch and x_name in element.attrib: - element.set(x_name, value) - - async def _update_wlan_template(self, wlansvc: ET.Element): - """Update WLAN template""" - xml_bytes = ET.tostring(wlansvc) - await self._do_conf( - f"" - f"{xml_bytes.decode('utf-8')}", timeout=20 - ) - - async def _add_wlan_template(self, wlansvc: ET.Element): - """Add WLAN template""" - xml_bytes = ET.tostring(wlansvc) - await self._do_conf( - f"" - f"{xml_bytes.decode('utf-8')}", timeout=20 - ) - - async def _find_ap_by_mac(self, mac: str) -> dict: - """Find AP by MAC""" - return next((ap for ap in await self.get_aps() if ap["mac"] == mac), None) - - async def _find_ap_group_by_name(self, name: str) -> dict: - """Find AP group by name""" - return next(( - ap_group for ap_group in await self.get_ap_groups() if ap_group["name"] == name - ), None) - - async def _find_wlan_by_name(self, name: str) -> dict: - """Find WLAN by name""" - return next(( - wlan for wlan in await self.get_wlans() if wlan["name"] == name - ), None) - - async def _find_wlan_group_by_name(self, name: str) -> dict: - """Find WLAN group by name""" - return next(( - wlang for wlang in await self.get_wlan_groups() if wlang["name"] == name - ), None) - - async def _get_timestamp_at_controller(self) -> int: - """Get timestamp at controller""" - ts = self._ruckus_timestamp() - time_info = await self.cmdstat( - f"" - f"" - ) - return int(time_info["response"]["time"]["time"]) - - async def _cmdstat_noparse(self, data: str, timeout: int | None = None) -> str: - """Call cmdstat without parsing response""" - return await self.session.request(self.session.cmdstat_url, data, timeout) - - async def cmdstat( - self, data: str, collection_elements: List[str] = None, aggressive_unwrap: bool = True, - timeout: int | None = None - ) -> dict | List: - """Call cmdstat and parse xml result""" - result_text = await self._cmdstat_noparse(data, timeout) - return self._ruckus_xml_unwrap(result_text, collection_elements, aggressive_unwrap) - - async def cmdstat_piecewise( - self, comp: str, element_type: str, element_collection: str | None = None, filter: Dict[str, Any] | None = None, limit: int = 300, page_size: int | None = None, updater: str | None = None, timeout: int | None = None - ) -> AsyncIterator[dict]: - """Call cmdstat and parse piecewise xml results""" - - ts_time = self._ruckus_timestamp(random_part=False) - ts_random = self._ruckus_timestamp(time_part=False) - updater = updater or comp - page_size = page_size or limit - - piece_stat = { - "@pid": 0, - "@start": 0, - "@number": page_size, - "@requestId": f"{updater}.{ts_time}", - "@cleanupId": f"{updater}.{ts_time}.{ts_random}" - } - - request = {"ajax-request": { - "@action": "getstat", - "@comp": comp, - "@updater": f"{updater}.{ts_time}.{ts_random}", - element_type : self._get_event_filter(filter), - "pieceStat" : piece_stat - }} - - pid = 0 - item_number = 0 - element_collection = element_collection or "response" - - while True: - pid += 1 - if page_size > limit > 0: - page_size = limit - - piece_stat["@pid"] = pid - piece_stat["@start"] = item_number - piece_stat["@number"] = page_size - - request_xml = xmltodict.unparse(request, full_document=False, short_empty_elements=True) - response = (await self.cmdstat(request_xml, [element_type], aggressive_unwrap=False))[element_collection] - - if element_type not in response: - return - for element in response[element_type]: - yield element - item_number += 1 - if limit == 1: - return - limit -= 1 - if response["done"] == "true": - return - - @staticmethod - def _get_event_filter(filter: Dict[str, Any] = None, sort_by: str = "time", sort_descending: bool = True) -> str: - - result = { - "@sortBy": sort_by, - "@sortDirection": -1 if sort_descending else 1 - } - if filter is not None: - for key, values in filter.items(): - if isinstance(values, str): - result[f"@{key}"] = values - else: - joined_values = f"|{'|'.join(values)}|" - result[f"@{key}"] = joined_values - return result - - async def _conf_noparse(self, data: str, timeout: int | None = None) -> str: - """Call conf without parsing response""" - return await self.session.request(self.session.conf_url, data, timeout) - - async def conf( - self, data: str, collection_elements: List[str] = None, timeout: int | None = None - ) -> dict | List: - """Call conf and parse xml result""" - result_text = await self._conf_noparse(data, timeout) - return self._ruckus_xml_unwrap(result_text, collection_elements) - - async def _do_conf( - self, data: str, collection_elements: List[str] = None, timeout: int | None = None - ) -> None: - """Call conf and confirm success""" - result = await self.conf(data, collection_elements, timeout) - if "xmsg" in result: - raise ValueError(result["xmsg"]["lmsg"]) - - @staticmethod - def _ruckus_timestamp(time_part: bool = True, random_part: bool = True) -> str: - return f"{int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000) if time_part else ''}{('.' if time_part and random_part else '')}{int(9000 * random.random()) + 1000 if random_part else ''}" - - @staticmethod - def _ruckus_backup_timestamp() -> str: - return datetime.datetime.now(datetime.timezone.utc).strftime("%m%d%y_%H_%M") - - @staticmethod - def _normalize_mac(mac: str) -> str: - """Normalize MAC address format""" - if mac and match(r"(?:[0-9a-f]{2}[:-]){5}[0-9a-f]{2}", string=mac, flags=IGNORECASE): - return mac.replace('-', ':').lower() - raise ValueError(ERROR_INVALID_MAC) - - @staticmethod - def _validate_passphrase(passphrase: str) -> str: - """Validate passphrase against ZoneDirector/Unleashed rules""" - if passphrase and match(r".*<.*>.*", string=passphrase): - raise ValueError(ERROR_PASSPHRASE_JS) - if passphrase and match( - r"(^[!-~]([ -~]){6,61}[!-~]$)|(^([0-9a-fA-F]){64}$)", string=passphrase - ): - return passphrase - raise ValueError(ERROR_PASSPHRASE_LEN) - \ No newline at end of file diff --git a/aioruckus/ruckusajaxapi.py b/aioruckus/ruckusajaxapi.py index ab272ea..82e63c0 100644 --- a/aioruckus/ruckusajaxapi.py +++ b/aioruckus/ruckusajaxapi.py @@ -1,185 +1,657 @@ -"""Adds AJAX Statistics and Command methods to RuckusApi""" - -from abc import abstractmethod -from re import IGNORECASE, match -from typing import List - -from .const import ( - ERROR_INVALID_MAC, - ERROR_PASSPHRASE_LEN, - ERROR_PASSPHRASE_JS, - PatchNewAttributeMode, - SystemStat -) -from .ruckusapi import RuckusApi - -class RuckusAjaxApi(RuckusApi): - """Ruckus Configuration, Statistics and Commands API""" - - @abstractmethod - async def get_system_info(self, *sections: SystemStat) -> dict: - pass - - @abstractmethod - async def get_active_clients(self, interval_stats: bool = False) -> List: - """Return a list of active clients""" - pass - - @abstractmethod - async def get_inactive_clients(self) -> List: - """Return a list of inactive clients""" - pass - - @abstractmethod - async def get_ap_stats(self) -> List: - """Return a list of AP statistics""" - pass - - @abstractmethod - async def get_ap_group_stats(self) -> List: - """Return a list of AP group statistics""" - pass - - @abstractmethod - async def get_vap_stats(self) -> List: - """Return a list of Virtual AP (per-radio WLAN) statistics""" - pass - - @abstractmethod - async def get_wlan_group_stats(self) -> List: - """Return a list of WLAN group statistics""" - pass - - @abstractmethod - async def get_dpsk_stats(self) -> List: - """Return a list of AP group statistics""" - pass - - @abstractmethod - async def get_active_rogues(self) -> list[dict]: - """Return a list of currently active rogue devices""" - pass - - @abstractmethod - async def get_known_rogues(self, limit: int = 300) -> list[dict]: - """Return a list of known/recognized rogues devices""" - pass - - @abstractmethod - async def get_blocked_rogues(self, limit: int = 300) -> list[dict]: - """Return a list of user blocked rogues devices""" - pass - - @abstractmethod - async def get_all_alarms(self, limit: int = 300) -> list[dict]: - """Return a list of all alerts""" - pass - - @abstractmethod - async def get_all_events(self, limit: int = 300) -> list[dict]: - """Return a list of all events""" - pass - - @abstractmethod - async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]: - """Return a list of WLAN events""" - pass - - @abstractmethod - async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]: - """Return a list of AP events""" - pass - - @abstractmethod - async def get_client_events(self, limit: int = 300) -> list[dict]: - """Return a list of client events""" - pass - - @abstractmethod - async def get_wired_client_events(self, limit: int = 300) -> list[dict]: - """Return a list of wired client events""" - pass - - @abstractmethod - async def get_syslog(self) -> str: - """Return a list of syslog entries""" - pass - - @abstractmethod - async def get_backup(self) -> bytes: - """Return a backup""" - pass - - @abstractmethod - async def do_block_client(self, mac: str) -> None: - """Block a client""" - pass - - @abstractmethod - async def do_unblock_client(self, mac: str) -> None: - """Unblock a client""" - pass - - @abstractmethod - async def do_delete_ap_group(self, name: str) -> bool: - """Delete an AP group""" - pass - - @abstractmethod - async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None: - """Disable a WLAN""" - pass - - @abstractmethod - async def do_enable_wlan(self, name: str) -> None: - """Enable a WLAN""" - pass - - @abstractmethod - async def do_edit_wlan( - self, name: str, patch: dict, patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR - ) -> None: - pass - - @abstractmethod - async def do_set_wlan_password( - self, - name: str, - passphrase: str, - sae_passphrase: str = None - ) -> None: - pass - - @abstractmethod - async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None: - """Hide AP LEDs""" - pass - - @abstractmethod - async def do_show_ap_leds(self, mac: str) -> None: - """Show AP LEDs""" - pass - - @abstractmethod - async def do_restart_ap(self, mac: str) -> None: - """Restart AP""" - pass - - @staticmethod - def _normalize_mac(mac: str) -> str: - """Normalize MAC address format""" - if mac and match(r"(?:[0-9a-f]{2}[:-]){5}[0-9a-f]{2}", string=mac, flags=IGNORECASE): - return mac.replace('-', ':').lower() - raise ValueError(ERROR_INVALID_MAC) - - @staticmethod - def _validate_passphrase(passphrase: str) -> str: - """Validate passphrase against ZoneDirector/Unleashed rules""" - if passphrase and match(r".*<.*>.*", string=passphrase): - raise ValueError(ERROR_PASSPHRASE_JS) - if passphrase and match( - r"(^[!-~]([ -~]){6,61}[!-~]$)|(^([0-9a-fA-F]){64}$)", string=passphrase - ): - return passphrase - raise ValueError(ERROR_PASSPHRASE_LEN) +"""Adds AJAX Statistics and Command methods to RuckusApi""" + +from collections.abc import AsyncIterator +import datetime +import random +from re import IGNORECASE, match +from typing import Any, Dict, List +import xml.etree.ElementTree as ET +from xml.sax import saxutils +import xmltodict + +from aioruckus.typing_policy import L2Policy + +from .const import ( + ERROR_ACL_NOT_FOUND, + ERROR_ACL_SYSTEM, + ERROR_ACL_TOO_BIG, + ERROR_INVALID_MAC, + ERROR_PASSPHRASE_LEN, + ERROR_PASSPHRASE_JS, + ERROR_PASSPHRASE_MISSING, + ERROR_SAEPASSPHRASE_MISSING, + ERROR_INVALID_WLAN, + ERROR_PASSPHRASE_NAME, + PatchNewAttributeMode, + SystemStat, + WlanEncryption +) +from .abcsession import ConfigItem +from .ajaxsession import AjaxSession +from .ruckusapi import RuckusApi + +class RuckusAjaxApi(RuckusApi): + """Ruckus ZoneDirector or Unleashed Configuration, Statistics and Commands API""" + def __init__(self, session: AjaxSession): + super().__init__(session) + + async def get_system_info(self, *sections: SystemStat) -> dict: + sections = ( + [s for section_list in sections for s in section_list.value] + if sections else SystemStat.DEFAULT.value + ) + section = ''.join(f"<{s}/>" for s in sections) + sysinfo = await self.cmdstat( + f"{section}" + ) + return sysinfo.get("response", sysinfo.get("system")) + + async def get_active_clients(self, interval_stats: bool = False) -> List: + """Return a list of active clients""" + if interval_stats: + endtime = await self._get_timestamp_at_controller() + starttime = endtime - 86400 + clientrequest = f"" + else: + clientrequest = "" + return await self.cmdstat(f"{clientrequest}", ["client"]) + + async def get_inactive_clients(self) -> List: + """Return a list of inactive clients""" + return await self.cmdstat("", ["client"]) + + async def get_ap_stats(self) -> List: + """Return a list of AP statistics""" + return await self.cmdstat( + "" + "", ["ap"] + ) + + async def get_ap_group_stats(self) -> List: + """Return a list of AP group statistics""" + return await self.cmdstat( + "" + "", ["group", "radio", "ap"] + ) + + async def get_vap_stats(self) -> List: + """Return a list of Virtual AP (per-radio WLAN) statistics""" + return await self.cmdstat( + "" + "", ["vap"] + ) + + async def get_wlan_group_stats(self) -> List: + """Return a list of WLAN group statistics""" + return await self.cmdstat( + "" + "", ["wlangroup", "wlan"] + ) + + async def get_dpsk_stats(self) -> List: + """Return a list of AP group statistics""" + return await self.cmdstat( + "" + "", ["dpsk"] + ) + + async def get_active_rogues(self) -> list[dict]: + """Return a list of currently active rogue devices""" + return await self.cmdstat( + "" + "", ["rogue"] + ) + + async def get_known_rogues(self, limit: int = 300) -> list[dict]: + """Return a list of known/recognized rogues devices""" + return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "recognized": "true"}, updater="krogue", limit=limit)] + + async def get_blocked_rogues(self, limit: int = 300) -> list[dict]: + """Return a list of user blocked rogues devices""" + return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "blocked": "true"}, updater="brogue", limit=limit)] + + async def get_all_alarms(self, limit: int = 300) -> list[dict]: + """Return a list of all alerts""" + return [alarm async for alarm in self.cmdstat_piecewise("eventd", "alarm", updater="page", limit=limit)] + + async def get_all_events(self, limit: int = 300) -> list[dict]: + """Return a list of all events""" + return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", limit=limit)] + + async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]: + """Return a list of WLAN events""" + return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"wlan": list(wlan_ids) if wlan_ids else "*"}, limit=limit)] + + async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]: + """Return a list of AP events""" + return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"ap": list(self._normalize_mac(mac) for mac in ap_macs) if ap_macs else "*"}, limit=limit)] + + async def get_client_events(self, limit: int = 300) -> list[dict]: + """Return a list of client events""" + return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "user"}, limit=limit)] + + async def get_wired_client_events(self, limit: int = 300) -> list[dict]: + """Return a list of wired client events""" + return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "wire"}, limit=limit)] + + async def get_syslog(self) -> str: + """Return a list of syslog entries""" + ts = self._ruckus_timestamp() + syslog = await self.cmdstat( + f"" + f"" + ) + return syslog["xmsg"]["res"] + + async def get_backup(self) -> bytes: + """Return a backup""" + request = self.session.base_url + "/_savebackup.jsp?time=" + self._ruckus_backup_timestamp() + return await self.session.request_file(request, 60) + + async def do_block_client(self, mac: str) -> None: + """Block a client""" + mac = self._normalize_mac(mac) + await self.cmdstat( + f"" + f"" + f"" + ) + + async def do_unblock_client(self, mac: str) -> None: + """Unblock a client""" + mac = self._normalize_mac(mac) + blocked = await self.get_blocked_client_macs() + remaining = ''.join(( + f"" for deny in blocked + if deny["mac"] != mac + )) + await self._do_conf( + f"" + f"" + f"{remaining}" + ) + + async def do_set_acl_members(self, name: str, macs: list[str]) -> bool: + """Set ACL members""" + acl = await self._find_acl_by_name(name) + if acl is None: + raise ValueError(ERROR_ACL_NOT_FOUND) + if acl["id"] == 1: + raise ValueError(ERROR_ACL_SYSTEM) + if len(macs) > 128: + raise ValueError(ERROR_ACL_TOO_BIG) + + macs = [self._normalize_mac(mac) for mac in macs] + acl_tag = "deny" if acl["default-mode"] == "allow" else "accept" + + acl = ET.Element("acl", { + "id": acl["id"], + "name": acl["name"], + "description": acl["description"], + "default-mode": acl["default-mode"] + }) + for mac in macs: + ET.SubElement(acl, acl_tag, {"mac": mac}) + + await self._do_conf( + f"" + f"{ET.tostring(acl).decode('utf-8')}" + ) + + async def do_delete_ap_group(self, name: str) -> bool: + """Delete an AP group""" + ap_group = await self._find_ap_group_by_name(name) + if ap_group is None: + return False + ts = self._ruckus_timestamp() + await self._do_conf( + f"" + f"" + ) + return True + + async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None: + """Disable a WLAN""" + wlan = await self._find_wlan_by_name(name) + if wlan: + ts = self._ruckus_timestamp() + await self._do_conf( + f"" + f"" + ) + + async def do_enable_wlan(self, name: str) -> None: + """Enable a WLAN""" + await self.do_disable_wlan(name, False) + + async def do_set_wlan_password( + self, + name: str, + passphrase: str, + sae_passphrase: str = None + ) -> None: + """Set a WLAN password""" + sae_passphrase = sae_passphrase or passphrase + await self.do_edit_wlan( + name, {"wpa": {"passphrase": passphrase, "sae-passphrase": sae_passphrase}}, True + ) + + async def do_add_wlan( + self, + name: str, + encryption: WlanEncryption = WlanEncryption.WPA2, + passphrase: str = None, + sae_passphrase: str = None, + ssid_override: str = None, + ignore_unknown_attributes: bool = False + ) -> None: + """Add a WLAN""" + patch = {"name": name, "ssid": ssid_override or name, "encryption": encryption.value} + if passphrase is not None or sae_passphrase is not None: + patch_wpa = {} + patch["wpa"] = patch_wpa + if passphrase is not None: + patch_wpa["passphrase"] = passphrase + if sae_passphrase is not None: + patch_wpa["sae-passphrase"] = sae_passphrase + await self.do_clone_wlan(patch) + + async def do_clone_wlan( + self, template: dict, new_name: str = None, new_ssid: str = None + ) -> None: + """Clone a WLAN""" + wlansvc = await self._get_default_wlan_template() + self._normalize_encryption(wlansvc, template) + self._patch_template(wlansvc, template, True) + if new_name is not None or new_ssid is not None: + if new_name is None: + raise ValueError(ERROR_PASSPHRASE_NAME) + self._patch_template(wlansvc, {"name": new_name, "ssid": new_ssid or new_name }) + await self._add_wlan_template(wlansvc) + + async def do_edit_wlan( + self, name: str, patch: dict, patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR + ) -> None: + """Edit a WLAN""" + wlansvc = await self._get_wlan_template(name) + if wlansvc: + self._normalize_encryption(wlansvc, patch) + self._patch_template(wlansvc, patch, patch_new_attributes) + await self._update_wlan_template(wlansvc) + + async def do_delete_wlan(self, name: str) -> bool: + """Delete a WLAN""" + wlan = await self._find_wlan_by_name(name) + if wlan is None: + return False + ts = self._ruckus_timestamp() + await self._do_conf( + f"" + f"", timeout=20 + ) + return True + + async def do_add_wlan_group(self, name: str, description: str = "", wlans: List = None) -> None: + """Add a WLAN group""" + wlangroup = ET.Element("wlangroup", {"name": name, "description": description or ""}) + if wlans is not None: + wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()} + for wlansvc in wlans: + wlan_name = None + if isinstance(wlansvc, str): + if wlansvc in wlan_map: + wlan_name = wlansvc + elif isinstance(wlansvc, dict): + if "name" in wlansvc and wlansvc["name"] in wlan_map: + wlan_name = wlansvc["name"] + if wlan_name is None: + raise ValueError(ERROR_INVALID_WLAN) + ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlan_name]}) + await self._do_conf( + f"" + f"{ET.tostring(wlangroup).decode('utf-8')}" + ) + + async def do_clone_wlan_group(self, template: dict, name: str, description: str = None) -> None: + """Clone a WLAN group""" + wlangroup = ET.Element("wlangroup", { + "name": name, + "description": description or template.get("description", "") + }) + if "wlan" in template: + wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()} + for wlansvc in template["wlan"]: + ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlansvc["name"]]}) + await self._do_conf( + f"" + f"{ET.tostring(wlangroup).decode('utf-8')}" + ) + + async def do_delete_wlan_group(self, name: str) -> bool: + """Delete a WLAN group""" + wlang = await self._find_wlan_group_by_name(name) + if wlang is None: + return False + ts = self._ruckus_timestamp() + await self._do_conf( + f"" + f"" + ) + return True + + async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None: + """Hide AP LEDs""" + mac = self._normalize_mac(mac) + found_ap = await self._find_ap_by_mac(mac) + if found_ap: + ts = self._ruckus_timestamp() + await self._do_conf( + f"" + f"" + f"" + ) + + async def do_show_ap_leds(self, mac: str) -> None: + """Show AP LEDs""" + await self.do_hide_ap_leds(mac, False) + + async def do_restart_ap(self, mac: str) -> None: + """Restart AP""" + mac = self._normalize_mac(mac) + ts = self._ruckus_timestamp() + await self._cmdstat_noparse( + f"" + ) + + async def _get_default_apgroup_template(self) -> ET.Element: + """Get default AP group template""" + xml = await self.session.get_conf_str(ConfigItem.APGROUP_TEMPLATE) + root = ET.fromstring(xml) + return root.find(".//apgroup") + + async def _get_default_wlan_template(self) -> ET.Element: + """Get default WLAN template""" + xml = await self.session.get_conf_str(ConfigItem.WLANSVC_STANDARD_TEMPLATE) + root = ET.fromstring(xml) + wlansvc = root.find(".//wlansvc") + if wlansvc is not None: + return wlansvc + return self._get_default_cli_wlan_template() + + @staticmethod + def _get_default_cli_wlan_template() -> ET.Element: + wlansvc = ET.Element("wlansvc", { + "name": "default-standard-wlan", "ssid": "", "authentication": "open", + "encryption": "none", "is-guest": "false", "max-clients-per-radio": "100", + "do-802-11d": "disabled", "sta-info-extraction": "1", "force-dhcp": "0", + "force-dhcp-timeout": "10", "usage": "user", "policy-id": "", "policy6-id": "", + "precedence-id": "1", "devicepolicy-id": "", "role-based-access-ctrl": "false", + "acl-id": "1", "local-bridge": "1", "client-isolation": "disabled", + "ci-whitelist-id": "0", "bgscan": "1", "idle-timeout": "1", "max-idle-timeout": "300", + "dis-dgaf": "0", "authstats": "0", "https-redirection": "disabled" + }) + ET.SubElement(wlansvc, "qos", {"uplink-preset": "DISABLE", "downlink-preset": "DISABLE"}) + ET.SubElement(wlansvc, "queue-priority", { + "voice": "0", "video": "2", "data": "4", "background": "6" + }) + ET.SubElement(wlansvc, "wlan-schedule", { + "value": "0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:" + "0x0:0x0:0x0:0x0: 0x0:0x0:0x0:0x0:0x0:0x0" + }) + return wlansvc + + async def _get_wlan_template(self, name: str) -> ET.Element | None: + xml = await self.session.get_conf_str(ConfigItem.WLANSVC_LIST) + root = ET.fromstring(xml) + wlansvc = root.find(f".//wlansvc[@name='{saxutils.escape(name)}']") + return wlansvc + + def _normalize_encryption(self, wlansvc: ET.Element, patch: dict): + patch_wpa = patch["wpa"] if "wpa" in patch else None + if patch_wpa is not None: + if "passphrase" in patch_wpa: + self._validate_passphrase(patch_wpa["passphrase"]) + if "sae-passphrase" in patch_wpa: + self._validate_passphrase(patch_wpa["sae-passphrase"]) + + encryption = wlansvc.get("encryption") + if "encryption" in patch and patch["encryption"] != encryption: + new_encryption = patch["encryption"] + wlansvc.set("encryption", new_encryption) + + wpa = wlansvc.find("wpa") + new_wpa = {"cipher": "aes", "dynamic-psk": "disabled"} + + if new_encryption in (WlanEncryption.WPA2.value, WlanEncryption.WPA23_MIXED.value): + passphrase = wpa.get("passphrase") if wpa is not None else None + if not patch_wpa.get("passphrase") and passphrase is None: + raise ValueError(ERROR_PASSPHRASE_MISSING) + new_wpa["passphrase"] = passphrase or "" + if new_encryption in (WlanEncryption.WPA3.value, WlanEncryption.WPA23_MIXED.value): + sae_passphrase = wpa.get("sae_passphrase") if wpa is not None else None + if not patch_wpa.get("sae_passphrase") and sae_passphrase is None: + raise ValueError(ERROR_SAEPASSPHRASE_MISSING) + new_wpa["sae-passphrase"] = sae_passphrase or "" + + if wpa is not None: + wlansvc.remove(wpa) + if new_encryption != WlanEncryption.NONE.value: + wpa = ET.SubElement(wlansvc, "wpa", new_wpa) + + def _patch_template( + self, + element: ET.Element, + patch: dict, + patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR, + current_path: str = "" + ) -> None: + visited_children = set() + for child in element: + if child.tag in patch and isinstance(patch[child.tag], dict): + self._patch_template( + child, + patch[child.tag], + patch_new_attributes, + f"{current_path}/{child.tag}" + ) + visited_children.add(child.tag) + for name, value in patch.items(): + if name in visited_children: + pass + else: + current_value = element.get(name) + if isinstance(value, List): + raise ValueError(f"Applying lists is unsupported: {current_path}/{name}") + if current_value is None: + if patch_new_attributes == PatchNewAttributeMode.ERROR: + raise ValueError(f"Unknown attribute: {current_path}/{name}") + elif patch_new_attributes == PatchNewAttributeMode.IGNORE: + continue + else: + value = self._normalize_conf_value(current_value, value) + element.set(name, value) + x_name = f"x-{name}" + if x_name not in patch and x_name in element.attrib: + element.set(x_name, value) + + async def _update_wlan_template(self, wlansvc: ET.Element): + """Update WLAN template""" + xml_bytes = ET.tostring(wlansvc) + await self._do_conf( + f"" + f"{xml_bytes.decode('utf-8')}", timeout=20 + ) + + async def _add_wlan_template(self, wlansvc: ET.Element): + """Add WLAN template""" + xml_bytes = ET.tostring(wlansvc) + await self._do_conf( + f"" + f"{xml_bytes.decode('utf-8')}", timeout=20 + ) + + async def _find_ap_by_mac(self, mac: str) -> dict: + """Find AP by MAC""" + return next((ap for ap in await self.get_aps() if ap["mac"] == mac), None) + + async def _find_ap_group_by_name(self, name: str) -> dict: + """Find AP group by name""" + return next(( + ap_group for ap_group in await self.get_ap_groups() if ap_group["name"] == name + ), None) + + async def _find_wlan_by_name(self, name: str) -> dict: + """Find WLAN by name""" + return next(( + wlan for wlan in await self.get_wlans() if wlan["name"] == name + ), None) + + async def _find_wlan_group_by_name(self, name: str) -> dict: + """Find WLAN group by name""" + return next(( + wlang for wlang in await self.get_wlan_groups() if wlang["name"] == name + ), None) + + async def _find_acl_by_name(self, name: str) -> L2Policy | dict: + """Find L2 ACL by name""" + return next(( + acl for acl in await self.get_acls() if acl["name"] == name + ), None) + + async def _get_timestamp_at_controller(self) -> int: + """Get timestamp at controller""" + ts = self._ruckus_timestamp() + time_info = await self.cmdstat( + f"" + f"" + ) + return int(time_info["response"]["time"]["time"]) + + async def _cmdstat_noparse(self, data: str, timeout: int | None = None) -> str: + """Call cmdstat without parsing response""" + return await self.session.request(self.session.cmdstat_url, data, timeout) + + async def cmdstat( + self, data: str, collection_elements: List[str] = None, aggressive_unwrap: bool = True, + timeout: int | None = None + ) -> dict | List: + """Call cmdstat and parse xml result""" + result_text = await self._cmdstat_noparse(data, timeout) + return self._ruckus_xml_unwrap(result_text, collection_elements, aggressive_unwrap) + + async def cmdstat_piecewise( + self, comp: str, element_type: str, element_collection: str | None = None, filter: Dict[str, Any] | None = None, limit: int = 300, page_size: int | None = None, updater: str | None = None, timeout: int | None = None + ) -> AsyncIterator[dict]: + """Call cmdstat and parse piecewise xml results""" + + ts_time = self._ruckus_timestamp(random_part=False) + ts_random = self._ruckus_timestamp(time_part=False) + updater = updater or comp + page_size = page_size or limit + + piece_stat = { + "@pid": 0, + "@start": 0, + "@number": page_size, + "@requestId": f"{updater}.{ts_time}", + "@cleanupId": f"{updater}.{ts_time}.{ts_random}" + } + + request = {"ajax-request": { + "@action": "getstat", + "@comp": comp, + "@updater": f"{updater}.{ts_time}.{ts_random}", + element_type : self._get_event_filter(filter), + "pieceStat" : piece_stat + }} + + pid = 0 + item_number = 0 + element_collection = element_collection or "response" + + while True: + pid += 1 + if page_size > limit > 0: + page_size = limit + + piece_stat["@pid"] = pid + piece_stat["@start"] = item_number + piece_stat["@number"] = page_size + + request_xml = xmltodict.unparse(request, full_document=False, short_empty_elements=True) + response = (await self.cmdstat(request_xml, [element_type], aggressive_unwrap=False))[element_collection] + + if element_type not in response: + return + for element in response[element_type]: + yield element + item_number += 1 + if limit == 1: + return + limit -= 1 + if response["done"] == "true": + return + + @staticmethod + def _get_event_filter(filter: Dict[str, Any] = None, sort_by: str = "time", sort_descending: bool = True) -> str: + + result = { + "@sortBy": sort_by, + "@sortDirection": -1 if sort_descending else 1 + } + if filter is not None: + for key, values in filter.items(): + if isinstance(values, str): + result[f"@{key}"] = values + else: + joined_values = f"|{'|'.join(values)}|" + result[f"@{key}"] = joined_values + return result + + async def _conf_noparse(self, data: str, timeout: int | None = None) -> str: + """Call conf without parsing response""" + return await self.session.request(self.session.conf_url, data, timeout) + + async def conf( + self, data: str, collection_elements: List[str] = None, timeout: int | None = None + ) -> dict | List: + """Call conf and parse xml result""" + result_text = await self._conf_noparse(data, timeout) + return self._ruckus_xml_unwrap(result_text, collection_elements) + + async def _do_conf( + self, data: str, collection_elements: List[str] = None, timeout: int | None = None + ) -> None: + """Call conf and confirm success""" + result = await self.conf(data, collection_elements, timeout) + if "xmsg" in result: + raise ValueError(result["xmsg"]["lmsg"]) + + @staticmethod + def _ruckus_timestamp(time_part: bool = True, random_part: bool = True) -> str: + return f"{int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000) if time_part else ''}{('.' if time_part and random_part else '')}{int(9000 * random.random()) + 1000 if random_part else ''}" + + @staticmethod + def _ruckus_backup_timestamp() -> str: + return datetime.datetime.now(datetime.timezone.utc).strftime("%m%d%y_%H_%M") + + @staticmethod + def _normalize_mac(mac: str) -> str: + """Normalize MAC address format""" + if mac and match(r"(?:[0-9a-f]{2}[:-]){5}[0-9a-f]{2}", string=mac, flags=IGNORECASE): + return mac.replace('-', ':').lower() + raise ValueError(ERROR_INVALID_MAC) + + @staticmethod + def _validate_passphrase(passphrase: str) -> str: + """Validate passphrase against ZoneDirector/Unleashed rules""" + if passphrase and match(r".*<.*>.*", string=passphrase): + raise ValueError(ERROR_PASSPHRASE_JS) + if passphrase and match( + r"(^[!-~]([ -~]){6,61}[!-~]$)|(^([0-9a-fA-F]){64}$)", string=passphrase + ): + return passphrase + raise ValueError(ERROR_PASSPHRASE_LEN) \ No newline at end of file diff --git a/aioruckus/ruckusapi.py b/aioruckus/ruckusapi.py index 9a48438..682060c 100644 --- a/aioruckus/ruckusapi.py +++ b/aioruckus/ruckusapi.py @@ -332,6 +332,11 @@ def _process_ruckus_xml(path, key, value): if key == "apstamgr-stat" and not value: # return an empty array rather than None, for ease of use return key, [] + if ( + (key == "accept" or key == "deny") and not value and + path and len(path) > 0 and path[-1][0] == "acl" + ): + return key, [] if ( key == "status" and value and value.isnumeric() and diff --git a/aioruckus/smartzoneajaxapi.py b/aioruckus/smartzoneajaxapi.py index bfbb921..c383add 100644 --- a/aioruckus/smartzoneajaxapi.py +++ b/aioruckus/smartzoneajaxapi.py @@ -1,4 +1,4 @@ -"""Adds AJAX Statistics and Command methods to RuckusApi""" +"""Adds enough AJAX methods to RuckusApi to support Home Assistant""" from re import IGNORECASE, match from typing import List @@ -26,59 +26,59 @@ async def get_aps(self) -> List[dict]: async def get_ap_groups(self) -> List: """Return a list of AP groups""" - pass + raise NotImplementedError async def get_wlans(self) -> List[dict]: """Return a list of WLANs""" - pass + raise NotImplementedError async def get_wlan_groups(self) -> List[dict]: """Return a list of WLAN groups""" - pass + raise NotImplementedError async def get_urlfiltering_policies(self) -> list[UrlFilter | dict]: """Return a list of URL Filtering Policies""" - pass + raise NotImplementedError async def get_urlfiltering_blockingcategories(self) -> list[UrlBlockCategory | dict]: """Return a list of URL Filtering Blocking Categories""" - pass + raise NotImplementedError async def get_ip4_policies(self) -> list[Ip4Policy | dict]: """Return a list of IP4 Policies""" - pass + raise NotImplementedError async def get_ip6_policies(self) -> list[Ip6Policy | dict]: """Return a list of IP6 Policies""" - pass + raise NotImplementedError async def get_device_policies(self) -> list[DevicePolicy | dict]: """Return a list of Device Policies""" - pass + raise NotImplementedError async def get_precedence_policies(self) -> list[PrecedencePolicy | dict]: """Return a list of Precedence Policies""" - pass + raise NotImplementedError async def get_arc_policies(self) -> list[ArcPolicy | dict]: """Return a list of Application Recognition & Control Policies""" - pass + raise NotImplementedError async def get_arc_applications(self) -> list[ArcApplication | dict]: """Return a list of Application Recognition & Control User Defined Applications""" - pass + raise NotImplementedError async def get_arc_ports(self) -> list[ArcPort | dict]: """Return a list of Application Recognition & Control User Defined Ports""" - pass + raise NotImplementedError async def get_roles(self) -> list[Role | dict]: """Return a list of Roles""" - pass + raise NotImplementedError async def get_dpsks(self) -> list[Dpsk | dict]: """Return a list of DPSKs""" - pass + raise NotImplementedError async def get_system_info(self, *sections: SystemStat) -> dict: """Return system information""" @@ -104,15 +104,15 @@ async def __get_cluster_state(self) -> dict: async def get_zerotouch_mesh_ap_serials(self) -> dict: """Return a list of Pre-approved AP serial numbers""" - pass + raise NotImplementedError async def get_acls(self) -> list[L2Policy | dict]: """Return a list of ACLs""" - pass + raise NotImplementedError async def get_blocked_client_macs(self) -> list[L2Rule | dict]: """Return a list of blocked client MACs""" - pass + raise NotImplementedError async def get_active_clients(self, interval_stats: bool = False) -> List: """Return a list of active clients""" @@ -139,7 +139,7 @@ async def get_ap_stats(self) -> List: async def get_ap_group_stats(self) -> List: """Return a list of AP group statistics""" - pass + raise NotImplementedError async def get_vap_stats(self) -> List: """Return a list of Virtual AP (per-radio WLAN) statistics""" @@ -147,75 +147,75 @@ async def get_vap_stats(self) -> List: async def get_wlan_group_stats(self) -> List: """Return a list of WLAN group statistics""" - pass + raise NotImplementedError async def get_dpsk_stats(self) -> List: """Return a list of AP group statistics""" - pass + raise NotImplementedError async def get_active_rogues(self) -> list[dict]: """Return a list of currently active rogue devices""" - pass + raise NotImplementedError async def get_known_rogues(self, limit: int = 300) -> list[dict]: """Return a list of known/recognized rogues devices""" - pass + raise NotImplementedError async def get_blocked_rogues(self, limit: int = 300) -> list[dict]: """Return a list of user blocked rogues devices""" - pass + raise NotImplementedError async def get_all_alarms(self, limit: int = 300) -> list[dict]: """Return a list of all alerts""" - pass + raise NotImplementedError async def get_all_events(self, limit: int = 300) -> list[dict]: """Return a list of all events""" - pass + raise NotImplementedError async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]: """Return a list of WLAN events""" - pass + raise NotImplementedError async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]: """Return a list of AP events""" - pass + raise NotImplementedError async def get_client_events(self, limit: int = 300) -> list[dict]: """Return a list of client events""" - pass + raise NotImplementedError async def get_wired_client_events(self, limit: int = 300) -> list[dict]: """Return a list of wired client events""" - pass + raise NotImplementedError async def get_syslog(self) -> str: """Return a list of syslog entries""" - pass + raise NotImplementedError async def get_backup(self) -> bytes: """Return a backup""" - pass + raise NotImplementedError async def do_block_client(self, mac: str) -> None: """Block a client""" - pass + raise NotImplementedError async def do_unblock_client(self, mac: str) -> None: """Unblock a client""" - pass + raise NotImplementedError async def do_delete_ap_group(self, name: str) -> bool: """Delete an AP group""" - pass + raise NotImplementedError async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None: """Disable a WLAN""" - pass + raise NotImplementedError async def do_enable_wlan(self, name: str) -> None: """Enable a WLAN""" - pass + raise NotImplementedError async def do_set_wlan_password( self, @@ -223,18 +223,18 @@ async def do_set_wlan_password( passphrase: str, sae_passphrase: str = None ) -> None: - pass + raise NotImplementedError async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None: """Hide AP LEDs""" - pass + raise NotImplementedError async def do_show_ap_leds(self, mac: str) -> None: """Show AP LEDs""" - pass + raise NotImplementedError async def do_restart_ap(self, mac: str) -> None: """Restart AP""" - pass + raise NotImplementedError \ No newline at end of file