diff --git a/aioruckus/ajaxsession.py b/aioruckus/ajaxsession.py
index fe3417f..488223e 100644
--- a/aioruckus/ajaxsession.py
+++ b/aioruckus/ajaxsession.py
@@ -132,8 +132,8 @@ async def login(self) -> None:
raise ConnectionRefusedError(ERROR_CONNECT_TEMPORARY)
# pylint: disable=import-outside-toplevel
- from .nativeajaxapi import NativeAjaxApi
- self._api = NativeAjaxApi(self)
+ from .ruckusajaxapi import RuckusAjaxApi
+ self._api = RuckusAjaxApi(self)
return self
async def sz_login(self) -> None:
diff --git a/aioruckus/const.py b/aioruckus/const.py
index 4d5241d..83c4c00 100644
--- a/aioruckus/const.py
+++ b/aioruckus/const.py
@@ -18,6 +18,9 @@
ERROR_PASSPHRASE_MISSING = "WPA2 and Mixed WPA2/3 WLANs require a passphrase"
ERROR_SAEPASSPHRASE_MISSING = "WPA3 and Mixed WPA2/3 WLANs require an SAE passphrase"
ERROR_PASSPHRASE_NAME = "You must also provide a name if you wish to override the passphrase"
+ERROR_ACL_NOT_FOUND = "ACL not found"
+ERROR_ACL_TOO_BIG = "ACLs may only contain 128 stations"
+ERROR_ACL_SYSTEM = "Please use do_block_client() and do_unblock_client() to modify the System ACL"
class SystemStat(Enum):
"""Ruckus System Info section keys"""
diff --git a/aioruckus/nativeajaxapi.py b/aioruckus/nativeajaxapi.py
deleted file mode 100644
index 586ec8e..0000000
--- a/aioruckus/nativeajaxapi.py
+++ /dev/null
@@ -1,621 +0,0 @@
-"""Adds Unleashed / ZoneDirector AJAX Statistics and Command methods to RuckusApi"""
-
-from collections.abc import AsyncIterator
-import datetime
-import random
-from re import IGNORECASE, match
-from typing import Any, Dict, List
-import xml.etree.ElementTree as ET
-from xml.sax import saxutils
-import xmltodict
-
-from aioruckus.ruckusajaxapi import RuckusAjaxApi
-
-from .const import (
- ERROR_INVALID_MAC,
- ERROR_PASSPHRASE_LEN,
- ERROR_PASSPHRASE_JS,
- ERROR_PASSPHRASE_MISSING,
- ERROR_SAEPASSPHRASE_MISSING,
- ERROR_INVALID_WLAN,
- ERROR_PASSPHRASE_NAME,
- PatchNewAttributeMode,
- SystemStat,
- WlanEncryption
-)
-from .abcsession import ConfigItem
-from .ajaxsession import AjaxSession
-
-class NativeAjaxApi(RuckusAjaxApi):
- """Ruckus ZoneDirector or Unleashed Configuration, Statistics and Commands API"""
- def __init__(self, session: AjaxSession):
- super().__init__(session)
-
- async def get_system_info(self, *sections: SystemStat) -> dict:
- sections = (
- [s for section_list in sections for s in section_list.value]
- if sections else SystemStat.DEFAULT.value
- )
- section = ''.join(f"<{s}/>" for s in sections)
- sysinfo = await self.cmdstat(
- # language=XML
- f"{section}"
- )
- return sysinfo.get("response", sysinfo.get("system"))
-
- async def get_active_clients(self, interval_stats: bool = False) -> List:
- """Return a list of active clients"""
- if interval_stats:
- endtime = await self._get_timestamp_at_controller()
- starttime = endtime - 86400
- clientrequest = f""
- else:
- clientrequest = ""
- return await self.cmdstat(f"{clientrequest}", ["client"])
-
- async def get_inactive_clients(self) -> List:
- """Return a list of inactive clients"""
- return await self.cmdstat("", ["client"])
-
- async def get_ap_stats(self) -> List:
- """Return a list of AP statistics"""
- return await self.cmdstat(
- ""
- "", ["ap"]
- )
-
- async def get_ap_group_stats(self) -> List:
- """Return a list of AP group statistics"""
- return await self.cmdstat(
- ""
- "", ["group", "radio", "ap"]
- )
-
- async def get_vap_stats(self) -> List:
- """Return a list of Virtual AP (per-radio WLAN) statistics"""
- return await self.cmdstat(
- ""
- "", ["vap"]
- )
-
- async def get_wlan_group_stats(self) -> List:
- """Return a list of WLAN group statistics"""
- return await self.cmdstat(
- ""
- "", ["wlangroup", "wlan"]
- )
-
- async def get_dpsk_stats(self) -> List:
- """Return a list of AP group statistics"""
- return await self.cmdstat(
- ""
- "", ["dpsk"]
- )
-
- async def get_active_rogues(self) -> list[dict]:
- """Return a list of currently active rogue devices"""
- return await self.cmdstat(
- ""
- "", ["rogue"]
- )
-
- async def get_known_rogues(self, limit: int = 300) -> list[dict]:
- """Return a list of known/recognized rogues devices"""
- return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "recognized": "true"}, updater="krogue", limit=limit)]
-
- async def get_blocked_rogues(self, limit: int = 300) -> list[dict]:
- """Return a list of user blocked rogues devices"""
- return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "blocked": "true"}, updater="brogue", limit=limit)]
-
- async def get_all_alarms(self, limit: int = 300) -> list[dict]:
- """Return a list of all alerts"""
- return [alarm async for alarm in self.cmdstat_piecewise("eventd", "alarm", updater="page", limit=limit)]
-
- async def get_all_events(self, limit: int = 300) -> list[dict]:
- """Return a list of all events"""
- return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", limit=limit)]
-
- async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]:
- """Return a list of WLAN events"""
- return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"wlan": list(wlan_ids) if wlan_ids else "*"}, limit=limit)]
-
- async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]:
- """Return a list of AP events"""
- return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"ap": list(self._normalize_mac(mac) for mac in ap_macs) if ap_macs else "*"}, limit=limit)]
-
- async def get_client_events(self, limit: int = 300) -> list[dict]:
- """Return a list of client events"""
- return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "user"}, limit=limit)]
-
- async def get_wired_client_events(self, limit: int = 300) -> list[dict]:
- """Return a list of wired client events"""
- return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "wire"}, limit=limit)]
-
- async def get_syslog(self) -> str:
- """Return a list of syslog entries"""
- ts = self._ruckus_timestamp()
- syslog = await self.cmdstat(
- f""
- f""
- )
- return syslog["xmsg"]["res"]
-
- async def get_backup(self) -> bytes:
- """Return a backup"""
- request = self.session.base_url + "/_savebackup.jsp?time=" + self._ruckus_backup_timestamp()
- return await self.session.request_file(request, 60)
-
- async def do_block_client(self, mac: str) -> None:
- """Block a client"""
- mac = self._normalize_mac(mac)
- await self.cmdstat(
- f""
- f""
- f""
- )
-
- async def do_unblock_client(self, mac: str) -> None:
- """Unblock a client"""
- mac = self._normalize_mac(mac)
- blocked = await self.get_blocked_client_macs()
- remaining = ''.join((
- f"" for deny in blocked
- if deny["mac"] != mac
- ))
- await self._do_conf(
- f""
- f""
- f"{remaining}"
- )
-
- async def do_delete_ap_group(self, name: str) -> bool:
- """Delete an AP group"""
- ap_group = await self._find_ap_group_by_name(name)
- if ap_group is None:
- return False
- ts = self._ruckus_timestamp()
- await self._do_conf(
- f""
- f""
- )
- return True
-
- async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None:
- """Disable a WLAN"""
- wlan = await self._find_wlan_by_name(name)
- if wlan:
- ts = self._ruckus_timestamp()
- await self._do_conf(
- f""
- f""
- )
-
- async def do_enable_wlan(self, name: str) -> None:
- """Enable a WLAN"""
- await self.do_disable_wlan(name, False)
-
- async def do_set_wlan_password(
- self,
- name: str,
- passphrase: str,
- sae_passphrase: str = None
- ) -> None:
- """Set a WLAN password"""
- sae_passphrase = sae_passphrase or passphrase
- await self.do_edit_wlan(
- name, {"wpa": {"passphrase": passphrase, "sae-passphrase": sae_passphrase}}, True
- )
-
- async def do_add_wlan(
- self,
- name: str,
- encryption: WlanEncryption = WlanEncryption.WPA2,
- passphrase: str = None,
- sae_passphrase: str = None,
- ssid_override: str = None,
- ignore_unknown_attributes: bool = False
- ) -> None:
- """Add a WLAN"""
- patch = {"name": name, "ssid": ssid_override or name, "encryption": encryption.value}
- if passphrase is not None or sae_passphrase is not None:
- patch_wpa = {}
- patch["wpa"] = patch_wpa
- if passphrase is not None:
- patch_wpa["passphrase"] = passphrase
- if sae_passphrase is not None:
- patch_wpa["sae-passphrase"] = sae_passphrase
- await self.do_clone_wlan(patch)
-
- async def do_clone_wlan(
- self, template: dict, new_name: str = None, new_ssid: str = None
- ) -> None:
- """Clone a WLAN"""
- wlansvc = await self._get_default_wlan_template()
- self._normalize_encryption(wlansvc, template)
- self._patch_template(wlansvc, template, True)
- if new_name is not None or new_ssid is not None:
- if new_name is None:
- raise ValueError(ERROR_PASSPHRASE_NAME)
- self._patch_template(wlansvc, {"name": new_name, "ssid": new_ssid or new_name })
- await self._add_wlan_template(wlansvc)
-
- async def do_edit_wlan(
- self, name: str, patch: dict, patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR
- ) -> None:
- """Edit a WLAN"""
- wlansvc = await self._get_wlan_template(name)
- if wlansvc:
- self._normalize_encryption(wlansvc, patch)
- self._patch_template(wlansvc, patch, patch_new_attributes)
- await self._update_wlan_template(wlansvc)
-
- async def do_delete_wlan(self, name: str) -> bool:
- """Delete a WLAN"""
- wlan = await self._find_wlan_by_name(name)
- if wlan is None:
- return False
- ts = self._ruckus_timestamp()
- await self._do_conf(
- f""
- f"", timeout=20
- )
- return True
-
- async def do_add_wlan_group(self, name: str, description: str = "", wlans: List = None) -> None:
- """Add a WLAN group"""
- wlangroup = ET.Element("wlangroup", {"name": name, "description": description or ""})
- if wlans is not None:
- wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()}
- for wlansvc in wlans:
- wlan_name = None
- if isinstance(wlansvc, str):
- if wlansvc in wlan_map:
- wlan_name = wlansvc
- elif isinstance(wlansvc, dict):
- if "name" in wlansvc and wlansvc["name"] in wlan_map:
- wlan_name = wlansvc["name"]
- if wlan_name is None:
- raise ValueError(ERROR_INVALID_WLAN)
- ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlan_name]})
- await self._do_conf(
- f""
- f"{ET.tostring(wlangroup).decode('utf-8')}"
- )
-
- async def do_clone_wlan_group(self, template: dict, name: str, description: str = None) -> None:
- """Clone a WLAN group"""
- wlangroup = ET.Element("wlangroup", {
- "name": name,
- "description": description or template.get("description", "")
- })
- if "wlan" in template:
- wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()}
- for wlansvc in template["wlan"]:
- ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlansvc["name"]]})
- await self._do_conf(
- f""
- f"{ET.tostring(wlangroup).decode('utf-8')}"
- )
-
- async def do_delete_wlan_group(self, name: str) -> bool:
- """Delete a WLAN group"""
- wlang = await self._find_wlan_group_by_name(name)
- if wlang is None:
- return False
- ts = self._ruckus_timestamp()
- await self._do_conf(
- f""
- f""
- )
- return True
-
- async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None:
- """Hide AP LEDs"""
- mac = self._normalize_mac(mac)
- found_ap = await self._find_ap_by_mac(mac)
- if found_ap:
- ts = self._ruckus_timestamp()
- await self._do_conf(
- f""
- f""
- f""
- )
-
- async def do_show_ap_leds(self, mac: str) -> None:
- """Show AP LEDs"""
- await self.do_hide_ap_leds(mac, False)
-
- async def do_restart_ap(self, mac: str) -> None:
- """Restart AP"""
- mac = self._normalize_mac(mac)
- ts = self._ruckus_timestamp()
- await self._cmdstat_noparse(
- f""
- )
-
- async def _get_default_apgroup_template(self) -> ET.Element:
- """Get default AP group template"""
- xml = await self.session.get_conf_str(ConfigItem.APGROUP_TEMPLATE)
- root = ET.fromstring(xml)
- return root.find(".//apgroup")
-
- async def _get_default_wlan_template(self) -> ET.Element:
- """Get default WLAN template"""
- xml = await self.session.get_conf_str(ConfigItem.WLANSVC_STANDARD_TEMPLATE)
- root = ET.fromstring(xml)
- wlansvc = root.find(".//wlansvc")
- if wlansvc is not None:
- return wlansvc
- return self._get_default_cli_wlan_template()
-
- @staticmethod
- def _get_default_cli_wlan_template() -> ET.Element:
- wlansvc = ET.Element("wlansvc", {
- "name": "default-standard-wlan", "ssid": "", "authentication": "open",
- "encryption": "none", "is-guest": "false", "max-clients-per-radio": "100",
- "do-802-11d": "disabled", "sta-info-extraction": "1", "force-dhcp": "0",
- "force-dhcp-timeout": "10", "usage": "user", "policy-id": "", "policy6-id": "",
- "precedence-id": "1", "devicepolicy-id": "", "role-based-access-ctrl": "false",
- "acl-id": "1", "local-bridge": "1", "client-isolation": "disabled",
- "ci-whitelist-id": "0", "bgscan": "1", "idle-timeout": "1", "max-idle-timeout": "300",
- "dis-dgaf": "0", "authstats": "0", "https-redirection": "disabled"
- })
- ET.SubElement(wlansvc, "qos", {"uplink-preset": "DISABLE", "downlink-preset": "DISABLE"})
- ET.SubElement(wlansvc, "queue-priority", {
- "voice": "0", "video": "2", "data": "4", "background": "6"
- })
- ET.SubElement(wlansvc, "wlan-schedule", {
- "value": "0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:"
- "0x0:0x0:0x0:0x0: 0x0:0x0:0x0:0x0:0x0:0x0"
- })
- return wlansvc
-
- async def _get_wlan_template(self, name: str) -> ET.Element | None:
- xml = await self.session.get_conf_str(ConfigItem.WLANSVC_LIST)
- root = ET.fromstring(xml)
- wlansvc = root.find(f".//wlansvc[@name='{saxutils.escape(name)}']")
- return wlansvc
-
- def _normalize_encryption(self, wlansvc: ET.Element, patch: dict):
- patch_wpa = patch["wpa"] if "wpa" in patch else None
- if patch_wpa is not None:
- if "passphrase" in patch_wpa:
- self._validate_passphrase(patch_wpa["passphrase"])
- if "sae-passphrase" in patch_wpa:
- self._validate_passphrase(patch_wpa["sae-passphrase"])
-
- encryption = wlansvc.get("encryption")
- if "encryption" in patch and patch["encryption"] != encryption:
- new_encryption = patch["encryption"]
- wlansvc.set("encryption", new_encryption)
-
- wpa = wlansvc.find("wpa")
- new_wpa = {"cipher": "aes", "dynamic-psk": "disabled"}
-
- if new_encryption in (WlanEncryption.WPA2.value, WlanEncryption.WPA23_MIXED.value):
- passphrase = wpa.get("passphrase") if wpa is not None else None
- if not patch_wpa.get("passphrase") and passphrase is None:
- raise ValueError(ERROR_PASSPHRASE_MISSING)
- new_wpa["passphrase"] = passphrase or ""
- if new_encryption in (WlanEncryption.WPA3.value, WlanEncryption.WPA23_MIXED.value):
- sae_passphrase = wpa.get("sae_passphrase") if wpa is not None else None
- if not patch_wpa.get("sae_passphrase") and sae_passphrase is None:
- raise ValueError(ERROR_SAEPASSPHRASE_MISSING)
- new_wpa["sae-passphrase"] = sae_passphrase or ""
-
- if wpa is not None:
- wlansvc.remove(wpa)
- if new_encryption != WlanEncryption.NONE.value:
- wpa = ET.SubElement(wlansvc, "wpa", new_wpa)
-
- def _patch_template(
- self,
- element: ET.Element,
- patch: dict,
- patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR,
- current_path: str = ""
- ) -> None:
- visited_children = set()
- for child in element:
- if child.tag in patch and isinstance(patch[child.tag], dict):
- self._patch_template(
- child,
- patch[child.tag],
- patch_new_attributes,
- f"{current_path}/{child.tag}"
- )
- visited_children.add(child.tag)
- for name, value in patch.items():
- if name in visited_children:
- pass
- else:
- current_value = element.get(name)
- if isinstance(value, List):
- raise ValueError(f"Applying lists is unsupported: {current_path}/{name}")
- if current_value is None:
- if patch_new_attributes == PatchNewAttributeMode.ERROR:
- raise ValueError(f"Unknown attribute: {current_path}/{name}")
- elif patch_new_attributes == PatchNewAttributeMode.IGNORE:
- continue
- else:
- value = self._normalize_conf_value(current_value, value)
- element.set(name, value)
- x_name = f"x-{name}"
- if x_name not in patch and x_name in element.attrib:
- element.set(x_name, value)
-
- async def _update_wlan_template(self, wlansvc: ET.Element):
- """Update WLAN template"""
- xml_bytes = ET.tostring(wlansvc)
- await self._do_conf(
- f""
- f"{xml_bytes.decode('utf-8')}", timeout=20
- )
-
- async def _add_wlan_template(self, wlansvc: ET.Element):
- """Add WLAN template"""
- xml_bytes = ET.tostring(wlansvc)
- await self._do_conf(
- f""
- f"{xml_bytes.decode('utf-8')}", timeout=20
- )
-
- async def _find_ap_by_mac(self, mac: str) -> dict:
- """Find AP by MAC"""
- return next((ap for ap in await self.get_aps() if ap["mac"] == mac), None)
-
- async def _find_ap_group_by_name(self, name: str) -> dict:
- """Find AP group by name"""
- return next((
- ap_group for ap_group in await self.get_ap_groups() if ap_group["name"] == name
- ), None)
-
- async def _find_wlan_by_name(self, name: str) -> dict:
- """Find WLAN by name"""
- return next((
- wlan for wlan in await self.get_wlans() if wlan["name"] == name
- ), None)
-
- async def _find_wlan_group_by_name(self, name: str) -> dict:
- """Find WLAN group by name"""
- return next((
- wlang for wlang in await self.get_wlan_groups() if wlang["name"] == name
- ), None)
-
- async def _get_timestamp_at_controller(self) -> int:
- """Get timestamp at controller"""
- ts = self._ruckus_timestamp()
- time_info = await self.cmdstat(
- f""
- f""
- )
- return int(time_info["response"]["time"]["time"])
-
- async def _cmdstat_noparse(self, data: str, timeout: int | None = None) -> str:
- """Call cmdstat without parsing response"""
- return await self.session.request(self.session.cmdstat_url, data, timeout)
-
- async def cmdstat(
- self, data: str, collection_elements: List[str] = None, aggressive_unwrap: bool = True,
- timeout: int | None = None
- ) -> dict | List:
- """Call cmdstat and parse xml result"""
- result_text = await self._cmdstat_noparse(data, timeout)
- return self._ruckus_xml_unwrap(result_text, collection_elements, aggressive_unwrap)
-
- async def cmdstat_piecewise(
- self, comp: str, element_type: str, element_collection: str | None = None, filter: Dict[str, Any] | None = None, limit: int = 300, page_size: int | None = None, updater: str | None = None, timeout: int | None = None
- ) -> AsyncIterator[dict]:
- """Call cmdstat and parse piecewise xml results"""
-
- ts_time = self._ruckus_timestamp(random_part=False)
- ts_random = self._ruckus_timestamp(time_part=False)
- updater = updater or comp
- page_size = page_size or limit
-
- piece_stat = {
- "@pid": 0,
- "@start": 0,
- "@number": page_size,
- "@requestId": f"{updater}.{ts_time}",
- "@cleanupId": f"{updater}.{ts_time}.{ts_random}"
- }
-
- request = {"ajax-request": {
- "@action": "getstat",
- "@comp": comp,
- "@updater": f"{updater}.{ts_time}.{ts_random}",
- element_type : self._get_event_filter(filter),
- "pieceStat" : piece_stat
- }}
-
- pid = 0
- item_number = 0
- element_collection = element_collection or "response"
-
- while True:
- pid += 1
- if page_size > limit > 0:
- page_size = limit
-
- piece_stat["@pid"] = pid
- piece_stat["@start"] = item_number
- piece_stat["@number"] = page_size
-
- request_xml = xmltodict.unparse(request, full_document=False, short_empty_elements=True)
- response = (await self.cmdstat(request_xml, [element_type], aggressive_unwrap=False))[element_collection]
-
- if element_type not in response:
- return
- for element in response[element_type]:
- yield element
- item_number += 1
- if limit == 1:
- return
- limit -= 1
- if response["done"] == "true":
- return
-
- @staticmethod
- def _get_event_filter(filter: Dict[str, Any] = None, sort_by: str = "time", sort_descending: bool = True) -> str:
-
- result = {
- "@sortBy": sort_by,
- "@sortDirection": -1 if sort_descending else 1
- }
- if filter is not None:
- for key, values in filter.items():
- if isinstance(values, str):
- result[f"@{key}"] = values
- else:
- joined_values = f"|{'|'.join(values)}|"
- result[f"@{key}"] = joined_values
- return result
-
- async def _conf_noparse(self, data: str, timeout: int | None = None) -> str:
- """Call conf without parsing response"""
- return await self.session.request(self.session.conf_url, data, timeout)
-
- async def conf(
- self, data: str, collection_elements: List[str] = None, timeout: int | None = None
- ) -> dict | List:
- """Call conf and parse xml result"""
- result_text = await self._conf_noparse(data, timeout)
- return self._ruckus_xml_unwrap(result_text, collection_elements)
-
- async def _do_conf(
- self, data: str, collection_elements: List[str] = None, timeout: int | None = None
- ) -> None:
- """Call conf and confirm success"""
- result = await self.conf(data, collection_elements, timeout)
- if "xmsg" in result:
- raise ValueError(result["xmsg"]["lmsg"])
-
- @staticmethod
- def _ruckus_timestamp(time_part: bool = True, random_part: bool = True) -> str:
- return f"{int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000) if time_part else ''}{('.' if time_part and random_part else '')}{int(9000 * random.random()) + 1000 if random_part else ''}"
-
- @staticmethod
- def _ruckus_backup_timestamp() -> str:
- return datetime.datetime.now(datetime.timezone.utc).strftime("%m%d%y_%H_%M")
-
- @staticmethod
- def _normalize_mac(mac: str) -> str:
- """Normalize MAC address format"""
- if mac and match(r"(?:[0-9a-f]{2}[:-]){5}[0-9a-f]{2}", string=mac, flags=IGNORECASE):
- return mac.replace('-', ':').lower()
- raise ValueError(ERROR_INVALID_MAC)
-
- @staticmethod
- def _validate_passphrase(passphrase: str) -> str:
- """Validate passphrase against ZoneDirector/Unleashed rules"""
- if passphrase and match(r".*<.*>.*", string=passphrase):
- raise ValueError(ERROR_PASSPHRASE_JS)
- if passphrase and match(
- r"(^[!-~]([ -~]){6,61}[!-~]$)|(^([0-9a-fA-F]){64}$)", string=passphrase
- ):
- return passphrase
- raise ValueError(ERROR_PASSPHRASE_LEN)
-
\ No newline at end of file
diff --git a/aioruckus/ruckusajaxapi.py b/aioruckus/ruckusajaxapi.py
index ab272ea..82e63c0 100644
--- a/aioruckus/ruckusajaxapi.py
+++ b/aioruckus/ruckusajaxapi.py
@@ -1,185 +1,657 @@
-"""Adds AJAX Statistics and Command methods to RuckusApi"""
-
-from abc import abstractmethod
-from re import IGNORECASE, match
-from typing import List
-
-from .const import (
- ERROR_INVALID_MAC,
- ERROR_PASSPHRASE_LEN,
- ERROR_PASSPHRASE_JS,
- PatchNewAttributeMode,
- SystemStat
-)
-from .ruckusapi import RuckusApi
-
-class RuckusAjaxApi(RuckusApi):
- """Ruckus Configuration, Statistics and Commands API"""
-
- @abstractmethod
- async def get_system_info(self, *sections: SystemStat) -> dict:
- pass
-
- @abstractmethod
- async def get_active_clients(self, interval_stats: bool = False) -> List:
- """Return a list of active clients"""
- pass
-
- @abstractmethod
- async def get_inactive_clients(self) -> List:
- """Return a list of inactive clients"""
- pass
-
- @abstractmethod
- async def get_ap_stats(self) -> List:
- """Return a list of AP statistics"""
- pass
-
- @abstractmethod
- async def get_ap_group_stats(self) -> List:
- """Return a list of AP group statistics"""
- pass
-
- @abstractmethod
- async def get_vap_stats(self) -> List:
- """Return a list of Virtual AP (per-radio WLAN) statistics"""
- pass
-
- @abstractmethod
- async def get_wlan_group_stats(self) -> List:
- """Return a list of WLAN group statistics"""
- pass
-
- @abstractmethod
- async def get_dpsk_stats(self) -> List:
- """Return a list of AP group statistics"""
- pass
-
- @abstractmethod
- async def get_active_rogues(self) -> list[dict]:
- """Return a list of currently active rogue devices"""
- pass
-
- @abstractmethod
- async def get_known_rogues(self, limit: int = 300) -> list[dict]:
- """Return a list of known/recognized rogues devices"""
- pass
-
- @abstractmethod
- async def get_blocked_rogues(self, limit: int = 300) -> list[dict]:
- """Return a list of user blocked rogues devices"""
- pass
-
- @abstractmethod
- async def get_all_alarms(self, limit: int = 300) -> list[dict]:
- """Return a list of all alerts"""
- pass
-
- @abstractmethod
- async def get_all_events(self, limit: int = 300) -> list[dict]:
- """Return a list of all events"""
- pass
-
- @abstractmethod
- async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]:
- """Return a list of WLAN events"""
- pass
-
- @abstractmethod
- async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]:
- """Return a list of AP events"""
- pass
-
- @abstractmethod
- async def get_client_events(self, limit: int = 300) -> list[dict]:
- """Return a list of client events"""
- pass
-
- @abstractmethod
- async def get_wired_client_events(self, limit: int = 300) -> list[dict]:
- """Return a list of wired client events"""
- pass
-
- @abstractmethod
- async def get_syslog(self) -> str:
- """Return a list of syslog entries"""
- pass
-
- @abstractmethod
- async def get_backup(self) -> bytes:
- """Return a backup"""
- pass
-
- @abstractmethod
- async def do_block_client(self, mac: str) -> None:
- """Block a client"""
- pass
-
- @abstractmethod
- async def do_unblock_client(self, mac: str) -> None:
- """Unblock a client"""
- pass
-
- @abstractmethod
- async def do_delete_ap_group(self, name: str) -> bool:
- """Delete an AP group"""
- pass
-
- @abstractmethod
- async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None:
- """Disable a WLAN"""
- pass
-
- @abstractmethod
- async def do_enable_wlan(self, name: str) -> None:
- """Enable a WLAN"""
- pass
-
- @abstractmethod
- async def do_edit_wlan(
- self, name: str, patch: dict, patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR
- ) -> None:
- pass
-
- @abstractmethod
- async def do_set_wlan_password(
- self,
- name: str,
- passphrase: str,
- sae_passphrase: str = None
- ) -> None:
- pass
-
- @abstractmethod
- async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None:
- """Hide AP LEDs"""
- pass
-
- @abstractmethod
- async def do_show_ap_leds(self, mac: str) -> None:
- """Show AP LEDs"""
- pass
-
- @abstractmethod
- async def do_restart_ap(self, mac: str) -> None:
- """Restart AP"""
- pass
-
- @staticmethod
- def _normalize_mac(mac: str) -> str:
- """Normalize MAC address format"""
- if mac and match(r"(?:[0-9a-f]{2}[:-]){5}[0-9a-f]{2}", string=mac, flags=IGNORECASE):
- return mac.replace('-', ':').lower()
- raise ValueError(ERROR_INVALID_MAC)
-
- @staticmethod
- def _validate_passphrase(passphrase: str) -> str:
- """Validate passphrase against ZoneDirector/Unleashed rules"""
- if passphrase and match(r".*<.*>.*", string=passphrase):
- raise ValueError(ERROR_PASSPHRASE_JS)
- if passphrase and match(
- r"(^[!-~]([ -~]){6,61}[!-~]$)|(^([0-9a-fA-F]){64}$)", string=passphrase
- ):
- return passphrase
- raise ValueError(ERROR_PASSPHRASE_LEN)
+"""Adds AJAX Statistics and Command methods to RuckusApi"""
+
+from collections.abc import AsyncIterator
+import datetime
+import random
+from re import IGNORECASE, match
+from typing import Any, Dict, List
+import xml.etree.ElementTree as ET
+from xml.sax import saxutils
+import xmltodict
+
+from aioruckus.typing_policy import L2Policy
+
+from .const import (
+ ERROR_ACL_NOT_FOUND,
+ ERROR_ACL_SYSTEM,
+ ERROR_ACL_TOO_BIG,
+ ERROR_INVALID_MAC,
+ ERROR_PASSPHRASE_LEN,
+ ERROR_PASSPHRASE_JS,
+ ERROR_PASSPHRASE_MISSING,
+ ERROR_SAEPASSPHRASE_MISSING,
+ ERROR_INVALID_WLAN,
+ ERROR_PASSPHRASE_NAME,
+ PatchNewAttributeMode,
+ SystemStat,
+ WlanEncryption
+)
+from .abcsession import ConfigItem
+from .ajaxsession import AjaxSession
+from .ruckusapi import RuckusApi
+
+class RuckusAjaxApi(RuckusApi):
+ """Ruckus ZoneDirector or Unleashed Configuration, Statistics and Commands API"""
+ def __init__(self, session: AjaxSession):
+ super().__init__(session)
+
+ async def get_system_info(self, *sections: SystemStat) -> dict:
+ sections = (
+ [s for section_list in sections for s in section_list.value]
+ if sections else SystemStat.DEFAULT.value
+ )
+ section = ''.join(f"<{s}/>" for s in sections)
+ sysinfo = await self.cmdstat(
+ f"{section}"
+ )
+ return sysinfo.get("response", sysinfo.get("system"))
+
+ async def get_active_clients(self, interval_stats: bool = False) -> List:
+ """Return a list of active clients"""
+ if interval_stats:
+ endtime = await self._get_timestamp_at_controller()
+ starttime = endtime - 86400
+ clientrequest = f""
+ else:
+ clientrequest = ""
+ return await self.cmdstat(f"{clientrequest}", ["client"])
+
+ async def get_inactive_clients(self) -> List:
+ """Return a list of inactive clients"""
+ return await self.cmdstat("", ["client"])
+
+ async def get_ap_stats(self) -> List:
+ """Return a list of AP statistics"""
+ return await self.cmdstat(
+ ""
+ "", ["ap"]
+ )
+
+ async def get_ap_group_stats(self) -> List:
+ """Return a list of AP group statistics"""
+ return await self.cmdstat(
+ ""
+ "", ["group", "radio", "ap"]
+ )
+
+ async def get_vap_stats(self) -> List:
+ """Return a list of Virtual AP (per-radio WLAN) statistics"""
+ return await self.cmdstat(
+ ""
+ "", ["vap"]
+ )
+
+ async def get_wlan_group_stats(self) -> List:
+ """Return a list of WLAN group statistics"""
+ return await self.cmdstat(
+ ""
+ "", ["wlangroup", "wlan"]
+ )
+
+ async def get_dpsk_stats(self) -> List:
+ """Return a list of AP group statistics"""
+ return await self.cmdstat(
+ ""
+ "", ["dpsk"]
+ )
+
+ async def get_active_rogues(self) -> list[dict]:
+ """Return a list of currently active rogue devices"""
+ return await self.cmdstat(
+ ""
+ "", ["rogue"]
+ )
+
+ async def get_known_rogues(self, limit: int = 300) -> list[dict]:
+ """Return a list of known/recognized rogues devices"""
+ return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "recognized": "true"}, updater="krogue", limit=limit)]
+
+ async def get_blocked_rogues(self, limit: int = 300) -> list[dict]:
+ """Return a list of user blocked rogues devices"""
+ return [rogue async for rogue in self.cmdstat_piecewise("stamgr", "rogue", "apstamgr-stat", filter={"LEVEL": "1", "blocked": "true"}, updater="brogue", limit=limit)]
+
+ async def get_all_alarms(self, limit: int = 300) -> list[dict]:
+ """Return a list of all alerts"""
+ return [alarm async for alarm in self.cmdstat_piecewise("eventd", "alarm", updater="page", limit=limit)]
+
+ async def get_all_events(self, limit: int = 300) -> list[dict]:
+ """Return a list of all events"""
+ return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", limit=limit)]
+
+ async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]:
+ """Return a list of WLAN events"""
+ return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"wlan": list(wlan_ids) if wlan_ids else "*"}, limit=limit)]
+
+ async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]:
+ """Return a list of AP events"""
+ return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"ap": list(self._normalize_mac(mac) for mac in ap_macs) if ap_macs else "*"}, limit=limit)]
+
+ async def get_client_events(self, limit: int = 300) -> list[dict]:
+ """Return a list of client events"""
+ return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "user"}, limit=limit)]
+
+ async def get_wired_client_events(self, limit: int = 300) -> list[dict]:
+ """Return a list of wired client events"""
+ return [xevent async for xevent in self.cmdstat_piecewise("eventd", "xevent", filter={"c": "wire"}, limit=limit)]
+
+ async def get_syslog(self) -> str:
+ """Return a list of syslog entries"""
+ ts = self._ruckus_timestamp()
+ syslog = await self.cmdstat(
+ f""
+ f""
+ )
+ return syslog["xmsg"]["res"]
+
+ async def get_backup(self) -> bytes:
+ """Return a backup"""
+ request = self.session.base_url + "/_savebackup.jsp?time=" + self._ruckus_backup_timestamp()
+ return await self.session.request_file(request, 60)
+
+ async def do_block_client(self, mac: str) -> None:
+ """Block a client"""
+ mac = self._normalize_mac(mac)
+ await self.cmdstat(
+ f""
+ f""
+ f""
+ )
+
+ async def do_unblock_client(self, mac: str) -> None:
+ """Unblock a client"""
+ mac = self._normalize_mac(mac)
+ blocked = await self.get_blocked_client_macs()
+ remaining = ''.join((
+ f"" for deny in blocked
+ if deny["mac"] != mac
+ ))
+ await self._do_conf(
+ f""
+ f""
+ f"{remaining}"
+ )
+
+ async def do_set_acl_members(self, name: str, macs: list[str]) -> bool:
+ """Set ACL members"""
+ acl = await self._find_acl_by_name(name)
+ if acl is None:
+ raise ValueError(ERROR_ACL_NOT_FOUND)
+ if acl["id"] == 1:
+ raise ValueError(ERROR_ACL_SYSTEM)
+ if len(macs) > 128:
+ raise ValueError(ERROR_ACL_TOO_BIG)
+
+ macs = [self._normalize_mac(mac) for mac in macs]
+ acl_tag = "deny" if acl["default-mode"] == "allow" else "accept"
+
+ acl = ET.Element("acl", {
+ "id": acl["id"],
+ "name": acl["name"],
+ "description": acl["description"],
+ "default-mode": acl["default-mode"]
+ })
+ for mac in macs:
+ ET.SubElement(acl, acl_tag, {"mac": mac})
+
+ await self._do_conf(
+ f""
+ f"{ET.tostring(acl).decode('utf-8')}"
+ )
+
+ async def do_delete_ap_group(self, name: str) -> bool:
+ """Delete an AP group"""
+ ap_group = await self._find_ap_group_by_name(name)
+ if ap_group is None:
+ return False
+ ts = self._ruckus_timestamp()
+ await self._do_conf(
+ f""
+ f""
+ )
+ return True
+
+ async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None:
+ """Disable a WLAN"""
+ wlan = await self._find_wlan_by_name(name)
+ if wlan:
+ ts = self._ruckus_timestamp()
+ await self._do_conf(
+ f""
+ f""
+ )
+
+ async def do_enable_wlan(self, name: str) -> None:
+ """Enable a WLAN"""
+ await self.do_disable_wlan(name, False)
+
+ async def do_set_wlan_password(
+ self,
+ name: str,
+ passphrase: str,
+ sae_passphrase: str = None
+ ) -> None:
+ """Set a WLAN password"""
+ sae_passphrase = sae_passphrase or passphrase
+ await self.do_edit_wlan(
+ name, {"wpa": {"passphrase": passphrase, "sae-passphrase": sae_passphrase}}, True
+ )
+
+ async def do_add_wlan(
+ self,
+ name: str,
+ encryption: WlanEncryption = WlanEncryption.WPA2,
+ passphrase: str = None,
+ sae_passphrase: str = None,
+ ssid_override: str = None,
+ ignore_unknown_attributes: bool = False
+ ) -> None:
+ """Add a WLAN"""
+ patch = {"name": name, "ssid": ssid_override or name, "encryption": encryption.value}
+ if passphrase is not None or sae_passphrase is not None:
+ patch_wpa = {}
+ patch["wpa"] = patch_wpa
+ if passphrase is not None:
+ patch_wpa["passphrase"] = passphrase
+ if sae_passphrase is not None:
+ patch_wpa["sae-passphrase"] = sae_passphrase
+ await self.do_clone_wlan(patch)
+
+ async def do_clone_wlan(
+ self, template: dict, new_name: str = None, new_ssid: str = None
+ ) -> None:
+ """Clone a WLAN"""
+ wlansvc = await self._get_default_wlan_template()
+ self._normalize_encryption(wlansvc, template)
+ self._patch_template(wlansvc, template, True)
+ if new_name is not None or new_ssid is not None:
+ if new_name is None:
+ raise ValueError(ERROR_PASSPHRASE_NAME)
+ self._patch_template(wlansvc, {"name": new_name, "ssid": new_ssid or new_name })
+ await self._add_wlan_template(wlansvc)
+
+ async def do_edit_wlan(
+ self, name: str, patch: dict, patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR
+ ) -> None:
+ """Edit a WLAN"""
+ wlansvc = await self._get_wlan_template(name)
+ if wlansvc:
+ self._normalize_encryption(wlansvc, patch)
+ self._patch_template(wlansvc, patch, patch_new_attributes)
+ await self._update_wlan_template(wlansvc)
+
+ async def do_delete_wlan(self, name: str) -> bool:
+ """Delete a WLAN"""
+ wlan = await self._find_wlan_by_name(name)
+ if wlan is None:
+ return False
+ ts = self._ruckus_timestamp()
+ await self._do_conf(
+ f""
+ f"", timeout=20
+ )
+ return True
+
+ async def do_add_wlan_group(self, name: str, description: str = "", wlans: List = None) -> None:
+ """Add a WLAN group"""
+ wlangroup = ET.Element("wlangroup", {"name": name, "description": description or ""})
+ if wlans is not None:
+ wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()}
+ for wlansvc in wlans:
+ wlan_name = None
+ if isinstance(wlansvc, str):
+ if wlansvc in wlan_map:
+ wlan_name = wlansvc
+ elif isinstance(wlansvc, dict):
+ if "name" in wlansvc and wlansvc["name"] in wlan_map:
+ wlan_name = wlansvc["name"]
+ if wlan_name is None:
+ raise ValueError(ERROR_INVALID_WLAN)
+ ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlan_name]})
+ await self._do_conf(
+ f""
+ f"{ET.tostring(wlangroup).decode('utf-8')}"
+ )
+
+ async def do_clone_wlan_group(self, template: dict, name: str, description: str = None) -> None:
+ """Clone a WLAN group"""
+ wlangroup = ET.Element("wlangroup", {
+ "name": name,
+ "description": description or template.get("description", "")
+ })
+ if "wlan" in template:
+ wlan_map = {wlan["name"]:wlan["id"] for wlan in await self.get_wlans()}
+ for wlansvc in template["wlan"]:
+ ET.SubElement(wlangroup, "wlansvc", {"id": wlan_map[wlansvc["name"]]})
+ await self._do_conf(
+ f""
+ f"{ET.tostring(wlangroup).decode('utf-8')}"
+ )
+
+ async def do_delete_wlan_group(self, name: str) -> bool:
+ """Delete a WLAN group"""
+ wlang = await self._find_wlan_group_by_name(name)
+ if wlang is None:
+ return False
+ ts = self._ruckus_timestamp()
+ await self._do_conf(
+ f""
+ f""
+ )
+ return True
+
+ async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None:
+ """Hide AP LEDs"""
+ mac = self._normalize_mac(mac)
+ found_ap = await self._find_ap_by_mac(mac)
+ if found_ap:
+ ts = self._ruckus_timestamp()
+ await self._do_conf(
+ f""
+ f""
+ f""
+ )
+
+ async def do_show_ap_leds(self, mac: str) -> None:
+ """Show AP LEDs"""
+ await self.do_hide_ap_leds(mac, False)
+
+ async def do_restart_ap(self, mac: str) -> None:
+ """Restart AP"""
+ mac = self._normalize_mac(mac)
+ ts = self._ruckus_timestamp()
+ await self._cmdstat_noparse(
+ f""
+ )
+
+ async def _get_default_apgroup_template(self) -> ET.Element:
+ """Get default AP group template"""
+ xml = await self.session.get_conf_str(ConfigItem.APGROUP_TEMPLATE)
+ root = ET.fromstring(xml)
+ return root.find(".//apgroup")
+
+ async def _get_default_wlan_template(self) -> ET.Element:
+ """Get default WLAN template"""
+ xml = await self.session.get_conf_str(ConfigItem.WLANSVC_STANDARD_TEMPLATE)
+ root = ET.fromstring(xml)
+ wlansvc = root.find(".//wlansvc")
+ if wlansvc is not None:
+ return wlansvc
+ return self._get_default_cli_wlan_template()
+
+ @staticmethod
+ def _get_default_cli_wlan_template() -> ET.Element:
+ wlansvc = ET.Element("wlansvc", {
+ "name": "default-standard-wlan", "ssid": "", "authentication": "open",
+ "encryption": "none", "is-guest": "false", "max-clients-per-radio": "100",
+ "do-802-11d": "disabled", "sta-info-extraction": "1", "force-dhcp": "0",
+ "force-dhcp-timeout": "10", "usage": "user", "policy-id": "", "policy6-id": "",
+ "precedence-id": "1", "devicepolicy-id": "", "role-based-access-ctrl": "false",
+ "acl-id": "1", "local-bridge": "1", "client-isolation": "disabled",
+ "ci-whitelist-id": "0", "bgscan": "1", "idle-timeout": "1", "max-idle-timeout": "300",
+ "dis-dgaf": "0", "authstats": "0", "https-redirection": "disabled"
+ })
+ ET.SubElement(wlansvc, "qos", {"uplink-preset": "DISABLE", "downlink-preset": "DISABLE"})
+ ET.SubElement(wlansvc, "queue-priority", {
+ "voice": "0", "video": "2", "data": "4", "background": "6"
+ })
+ ET.SubElement(wlansvc, "wlan-schedule", {
+ "value": "0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:0x0:"
+ "0x0:0x0:0x0:0x0: 0x0:0x0:0x0:0x0:0x0:0x0"
+ })
+ return wlansvc
+
+ async def _get_wlan_template(self, name: str) -> ET.Element | None:
+ xml = await self.session.get_conf_str(ConfigItem.WLANSVC_LIST)
+ root = ET.fromstring(xml)
+ wlansvc = root.find(f".//wlansvc[@name='{saxutils.escape(name)}']")
+ return wlansvc
+
+ def _normalize_encryption(self, wlansvc: ET.Element, patch: dict):
+ patch_wpa = patch["wpa"] if "wpa" in patch else None
+ if patch_wpa is not None:
+ if "passphrase" in patch_wpa:
+ self._validate_passphrase(patch_wpa["passphrase"])
+ if "sae-passphrase" in patch_wpa:
+ self._validate_passphrase(patch_wpa["sae-passphrase"])
+
+ encryption = wlansvc.get("encryption")
+ if "encryption" in patch and patch["encryption"] != encryption:
+ new_encryption = patch["encryption"]
+ wlansvc.set("encryption", new_encryption)
+
+ wpa = wlansvc.find("wpa")
+ new_wpa = {"cipher": "aes", "dynamic-psk": "disabled"}
+
+ if new_encryption in (WlanEncryption.WPA2.value, WlanEncryption.WPA23_MIXED.value):
+ passphrase = wpa.get("passphrase") if wpa is not None else None
+ if not patch_wpa.get("passphrase") and passphrase is None:
+ raise ValueError(ERROR_PASSPHRASE_MISSING)
+ new_wpa["passphrase"] = passphrase or ""
+ if new_encryption in (WlanEncryption.WPA3.value, WlanEncryption.WPA23_MIXED.value):
+ sae_passphrase = wpa.get("sae_passphrase") if wpa is not None else None
+ if not patch_wpa.get("sae_passphrase") and sae_passphrase is None:
+ raise ValueError(ERROR_SAEPASSPHRASE_MISSING)
+ new_wpa["sae-passphrase"] = sae_passphrase or ""
+
+ if wpa is not None:
+ wlansvc.remove(wpa)
+ if new_encryption != WlanEncryption.NONE.value:
+ wpa = ET.SubElement(wlansvc, "wpa", new_wpa)
+
+ def _patch_template(
+ self,
+ element: ET.Element,
+ patch: dict,
+ patch_new_attributes: PatchNewAttributeMode = PatchNewAttributeMode.ERROR,
+ current_path: str = ""
+ ) -> None:
+ visited_children = set()
+ for child in element:
+ if child.tag in patch and isinstance(patch[child.tag], dict):
+ self._patch_template(
+ child,
+ patch[child.tag],
+ patch_new_attributes,
+ f"{current_path}/{child.tag}"
+ )
+ visited_children.add(child.tag)
+ for name, value in patch.items():
+ if name in visited_children:
+ pass
+ else:
+ current_value = element.get(name)
+ if isinstance(value, List):
+ raise ValueError(f"Applying lists is unsupported: {current_path}/{name}")
+ if current_value is None:
+ if patch_new_attributes == PatchNewAttributeMode.ERROR:
+ raise ValueError(f"Unknown attribute: {current_path}/{name}")
+ elif patch_new_attributes == PatchNewAttributeMode.IGNORE:
+ continue
+ else:
+ value = self._normalize_conf_value(current_value, value)
+ element.set(name, value)
+ x_name = f"x-{name}"
+ if x_name not in patch and x_name in element.attrib:
+ element.set(x_name, value)
+
+ async def _update_wlan_template(self, wlansvc: ET.Element):
+ """Update WLAN template"""
+ xml_bytes = ET.tostring(wlansvc)
+ await self._do_conf(
+ f""
+ f"{xml_bytes.decode('utf-8')}", timeout=20
+ )
+
+ async def _add_wlan_template(self, wlansvc: ET.Element):
+ """Add WLAN template"""
+ xml_bytes = ET.tostring(wlansvc)
+ await self._do_conf(
+ f""
+ f"{xml_bytes.decode('utf-8')}", timeout=20
+ )
+
+ async def _find_ap_by_mac(self, mac: str) -> dict:
+ """Find AP by MAC"""
+ return next((ap for ap in await self.get_aps() if ap["mac"] == mac), None)
+
+ async def _find_ap_group_by_name(self, name: str) -> dict:
+ """Find AP group by name"""
+ return next((
+ ap_group for ap_group in await self.get_ap_groups() if ap_group["name"] == name
+ ), None)
+
+ async def _find_wlan_by_name(self, name: str) -> dict:
+ """Find WLAN by name"""
+ return next((
+ wlan for wlan in await self.get_wlans() if wlan["name"] == name
+ ), None)
+
+ async def _find_wlan_group_by_name(self, name: str) -> dict:
+ """Find WLAN group by name"""
+ return next((
+ wlang for wlang in await self.get_wlan_groups() if wlang["name"] == name
+ ), None)
+
+ async def _find_acl_by_name(self, name: str) -> L2Policy | dict:
+ """Find L2 ACL by name"""
+ return next((
+ acl for acl in await self.get_acls() if acl["name"] == name
+ ), None)
+
+ async def _get_timestamp_at_controller(self) -> int:
+ """Get timestamp at controller"""
+ ts = self._ruckus_timestamp()
+ time_info = await self.cmdstat(
+ f""
+ f""
+ )
+ return int(time_info["response"]["time"]["time"])
+
+ async def _cmdstat_noparse(self, data: str, timeout: int | None = None) -> str:
+ """Call cmdstat without parsing response"""
+ return await self.session.request(self.session.cmdstat_url, data, timeout)
+
+ async def cmdstat(
+ self, data: str, collection_elements: List[str] = None, aggressive_unwrap: bool = True,
+ timeout: int | None = None
+ ) -> dict | List:
+ """Call cmdstat and parse xml result"""
+ result_text = await self._cmdstat_noparse(data, timeout)
+ return self._ruckus_xml_unwrap(result_text, collection_elements, aggressive_unwrap)
+
+ async def cmdstat_piecewise(
+ self, comp: str, element_type: str, element_collection: str | None = None, filter: Dict[str, Any] | None = None, limit: int = 300, page_size: int | None = None, updater: str | None = None, timeout: int | None = None
+ ) -> AsyncIterator[dict]:
+ """Call cmdstat and parse piecewise xml results"""
+
+ ts_time = self._ruckus_timestamp(random_part=False)
+ ts_random = self._ruckus_timestamp(time_part=False)
+ updater = updater or comp
+ page_size = page_size or limit
+
+ piece_stat = {
+ "@pid": 0,
+ "@start": 0,
+ "@number": page_size,
+ "@requestId": f"{updater}.{ts_time}",
+ "@cleanupId": f"{updater}.{ts_time}.{ts_random}"
+ }
+
+ request = {"ajax-request": {
+ "@action": "getstat",
+ "@comp": comp,
+ "@updater": f"{updater}.{ts_time}.{ts_random}",
+ element_type : self._get_event_filter(filter),
+ "pieceStat" : piece_stat
+ }}
+
+ pid = 0
+ item_number = 0
+ element_collection = element_collection or "response"
+
+ while True:
+ pid += 1
+ if page_size > limit > 0:
+ page_size = limit
+
+ piece_stat["@pid"] = pid
+ piece_stat["@start"] = item_number
+ piece_stat["@number"] = page_size
+
+ request_xml = xmltodict.unparse(request, full_document=False, short_empty_elements=True)
+ response = (await self.cmdstat(request_xml, [element_type], aggressive_unwrap=False))[element_collection]
+
+ if element_type not in response:
+ return
+ for element in response[element_type]:
+ yield element
+ item_number += 1
+ if limit == 1:
+ return
+ limit -= 1
+ if response["done"] == "true":
+ return
+
+ @staticmethod
+ def _get_event_filter(filter: Dict[str, Any] = None, sort_by: str = "time", sort_descending: bool = True) -> str:
+
+ result = {
+ "@sortBy": sort_by,
+ "@sortDirection": -1 if sort_descending else 1
+ }
+ if filter is not None:
+ for key, values in filter.items():
+ if isinstance(values, str):
+ result[f"@{key}"] = values
+ else:
+ joined_values = f"|{'|'.join(values)}|"
+ result[f"@{key}"] = joined_values
+ return result
+
+ async def _conf_noparse(self, data: str, timeout: int | None = None) -> str:
+ """Call conf without parsing response"""
+ return await self.session.request(self.session.conf_url, data, timeout)
+
+ async def conf(
+ self, data: str, collection_elements: List[str] = None, timeout: int | None = None
+ ) -> dict | List:
+ """Call conf and parse xml result"""
+ result_text = await self._conf_noparse(data, timeout)
+ return self._ruckus_xml_unwrap(result_text, collection_elements)
+
+ async def _do_conf(
+ self, data: str, collection_elements: List[str] = None, timeout: int | None = None
+ ) -> None:
+ """Call conf and confirm success"""
+ result = await self.conf(data, collection_elements, timeout)
+ if "xmsg" in result:
+ raise ValueError(result["xmsg"]["lmsg"])
+
+ @staticmethod
+ def _ruckus_timestamp(time_part: bool = True, random_part: bool = True) -> str:
+ return f"{int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000) if time_part else ''}{('.' if time_part and random_part else '')}{int(9000 * random.random()) + 1000 if random_part else ''}"
+
+ @staticmethod
+ def _ruckus_backup_timestamp() -> str:
+ return datetime.datetime.now(datetime.timezone.utc).strftime("%m%d%y_%H_%M")
+
+ @staticmethod
+ def _normalize_mac(mac: str) -> str:
+ """Normalize MAC address format"""
+ if mac and match(r"(?:[0-9a-f]{2}[:-]){5}[0-9a-f]{2}", string=mac, flags=IGNORECASE):
+ return mac.replace('-', ':').lower()
+ raise ValueError(ERROR_INVALID_MAC)
+
+ @staticmethod
+ def _validate_passphrase(passphrase: str) -> str:
+ """Validate passphrase against ZoneDirector/Unleashed rules"""
+ if passphrase and match(r".*<.*>.*", string=passphrase):
+ raise ValueError(ERROR_PASSPHRASE_JS)
+ if passphrase and match(
+ r"(^[!-~]([ -~]){6,61}[!-~]$)|(^([0-9a-fA-F]){64}$)", string=passphrase
+ ):
+ return passphrase
+ raise ValueError(ERROR_PASSPHRASE_LEN)
\ No newline at end of file
diff --git a/aioruckus/ruckusapi.py b/aioruckus/ruckusapi.py
index 9a48438..682060c 100644
--- a/aioruckus/ruckusapi.py
+++ b/aioruckus/ruckusapi.py
@@ -332,6 +332,11 @@ def _process_ruckus_xml(path, key, value):
if key == "apstamgr-stat" and not value:
# return an empty array rather than None, for ease of use
return key, []
+ if (
+ (key == "accept" or key == "deny") and not value and
+ path and len(path) > 0 and path[-1][0] == "acl"
+ ):
+ return key, []
if (
key == "status" and
value and value.isnumeric() and
diff --git a/aioruckus/smartzoneajaxapi.py b/aioruckus/smartzoneajaxapi.py
index bfbb921..c383add 100644
--- a/aioruckus/smartzoneajaxapi.py
+++ b/aioruckus/smartzoneajaxapi.py
@@ -1,4 +1,4 @@
-"""Adds AJAX Statistics and Command methods to RuckusApi"""
+"""Adds enough AJAX methods to RuckusApi to support Home Assistant"""
from re import IGNORECASE, match
from typing import List
@@ -26,59 +26,59 @@ async def get_aps(self) -> List[dict]:
async def get_ap_groups(self) -> List:
"""Return a list of AP groups"""
- pass
+ raise NotImplementedError
async def get_wlans(self) -> List[dict]:
"""Return a list of WLANs"""
- pass
+ raise NotImplementedError
async def get_wlan_groups(self) -> List[dict]:
"""Return a list of WLAN groups"""
- pass
+ raise NotImplementedError
async def get_urlfiltering_policies(self) -> list[UrlFilter | dict]:
"""Return a list of URL Filtering Policies"""
- pass
+ raise NotImplementedError
async def get_urlfiltering_blockingcategories(self) -> list[UrlBlockCategory | dict]:
"""Return a list of URL Filtering Blocking Categories"""
- pass
+ raise NotImplementedError
async def get_ip4_policies(self) -> list[Ip4Policy | dict]:
"""Return a list of IP4 Policies"""
- pass
+ raise NotImplementedError
async def get_ip6_policies(self) -> list[Ip6Policy | dict]:
"""Return a list of IP6 Policies"""
- pass
+ raise NotImplementedError
async def get_device_policies(self) -> list[DevicePolicy | dict]:
"""Return a list of Device Policies"""
- pass
+ raise NotImplementedError
async def get_precedence_policies(self) -> list[PrecedencePolicy | dict]:
"""Return a list of Precedence Policies"""
- pass
+ raise NotImplementedError
async def get_arc_policies(self) -> list[ArcPolicy | dict]:
"""Return a list of Application Recognition & Control Policies"""
- pass
+ raise NotImplementedError
async def get_arc_applications(self) -> list[ArcApplication | dict]:
"""Return a list of Application Recognition & Control User Defined Applications"""
- pass
+ raise NotImplementedError
async def get_arc_ports(self) -> list[ArcPort | dict]:
"""Return a list of Application Recognition & Control User Defined Ports"""
- pass
+ raise NotImplementedError
async def get_roles(self) -> list[Role | dict]:
"""Return a list of Roles"""
- pass
+ raise NotImplementedError
async def get_dpsks(self) -> list[Dpsk | dict]:
"""Return a list of DPSKs"""
- pass
+ raise NotImplementedError
async def get_system_info(self, *sections: SystemStat) -> dict:
"""Return system information"""
@@ -104,15 +104,15 @@ async def __get_cluster_state(self) -> dict:
async def get_zerotouch_mesh_ap_serials(self) -> dict:
"""Return a list of Pre-approved AP serial numbers"""
- pass
+ raise NotImplementedError
async def get_acls(self) -> list[L2Policy | dict]:
"""Return a list of ACLs"""
- pass
+ raise NotImplementedError
async def get_blocked_client_macs(self) -> list[L2Rule | dict]:
"""Return a list of blocked client MACs"""
- pass
+ raise NotImplementedError
async def get_active_clients(self, interval_stats: bool = False) -> List:
"""Return a list of active clients"""
@@ -139,7 +139,7 @@ async def get_ap_stats(self) -> List:
async def get_ap_group_stats(self) -> List:
"""Return a list of AP group statistics"""
- pass
+ raise NotImplementedError
async def get_vap_stats(self) -> List:
"""Return a list of Virtual AP (per-radio WLAN) statistics"""
@@ -147,75 +147,75 @@ async def get_vap_stats(self) -> List:
async def get_wlan_group_stats(self) -> List:
"""Return a list of WLAN group statistics"""
- pass
+ raise NotImplementedError
async def get_dpsk_stats(self) -> List:
"""Return a list of AP group statistics"""
- pass
+ raise NotImplementedError
async def get_active_rogues(self) -> list[dict]:
"""Return a list of currently active rogue devices"""
- pass
+ raise NotImplementedError
async def get_known_rogues(self, limit: int = 300) -> list[dict]:
"""Return a list of known/recognized rogues devices"""
- pass
+ raise NotImplementedError
async def get_blocked_rogues(self, limit: int = 300) -> list[dict]:
"""Return a list of user blocked rogues devices"""
- pass
+ raise NotImplementedError
async def get_all_alarms(self, limit: int = 300) -> list[dict]:
"""Return a list of all alerts"""
- pass
+ raise NotImplementedError
async def get_all_events(self, limit: int = 300) -> list[dict]:
"""Return a list of all events"""
- pass
+ raise NotImplementedError
async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]:
"""Return a list of WLAN events"""
- pass
+ raise NotImplementedError
async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]:
"""Return a list of AP events"""
- pass
+ raise NotImplementedError
async def get_client_events(self, limit: int = 300) -> list[dict]:
"""Return a list of client events"""
- pass
+ raise NotImplementedError
async def get_wired_client_events(self, limit: int = 300) -> list[dict]:
"""Return a list of wired client events"""
- pass
+ raise NotImplementedError
async def get_syslog(self) -> str:
"""Return a list of syslog entries"""
- pass
+ raise NotImplementedError
async def get_backup(self) -> bytes:
"""Return a backup"""
- pass
+ raise NotImplementedError
async def do_block_client(self, mac: str) -> None:
"""Block a client"""
- pass
+ raise NotImplementedError
async def do_unblock_client(self, mac: str) -> None:
"""Unblock a client"""
- pass
+ raise NotImplementedError
async def do_delete_ap_group(self, name: str) -> bool:
"""Delete an AP group"""
- pass
+ raise NotImplementedError
async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None:
"""Disable a WLAN"""
- pass
+ raise NotImplementedError
async def do_enable_wlan(self, name: str) -> None:
"""Enable a WLAN"""
- pass
+ raise NotImplementedError
async def do_set_wlan_password(
self,
@@ -223,18 +223,18 @@ async def do_set_wlan_password(
passphrase: str,
sae_passphrase: str = None
) -> None:
- pass
+ raise NotImplementedError
async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None:
"""Hide AP LEDs"""
- pass
+ raise NotImplementedError
async def do_show_ap_leds(self, mac: str) -> None:
"""Show AP LEDs"""
- pass
+ raise NotImplementedError
async def do_restart_ap(self, mac: str) -> None:
"""Restart AP"""
- pass
+ raise NotImplementedError
\ No newline at end of file