diff --git a/requirements.txt b/requirements.txt index 437ff96..9fbc99e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,4 +27,5 @@ typing_extensions<4.6.0 pyjwt~=2.6.0 websockets~=12.0.0 pandas -xlsxwriter~=3.2.0 \ No newline at end of file +xlsxwriter~=3.2.0 +xmltodict \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index c88b08e..e8c5d04 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,7 @@ install_requires = websockets >= 12.0.0 pandas xlsxwriter >= 3.2.0 + xmltodict package_dir = = src diff --git a/src/powerpwn/cli/arguments.py b/src/powerpwn/cli/arguments.py index 57b019e..199e093 100644 --- a/src/powerpwn/cli/arguments.py +++ b/src/powerpwn/cli/arguments.py @@ -227,6 +227,19 @@ def copilot_studio_modules(parser: argparse.ArgumentParser, module: str): parser.add_argument("-t", "--timeout", help="The timeout for the enumeration process to have (in seconds)", default=300) +def module_powerpages(parser: argparse.ArgumentParser): + powerpages = parser.add_parser( + "powerpages", + description="Test anonymous access to dataverse tables via power pages, either via the apis or odata feeds.", + help="Test anonymous access to dataverse tables via power pages, either via the apis or odata feeds.", + ) + powerpages.add_argument( + "-url", + help="The url of the power pages domain to be tested. Format the url as such: 'https://.powerappsportals.com'", + required=True, + ) + + def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument("-l", "--log-level", default=logging.INFO, type=lambda x: getattr(logging, x), help="Configure the logging level.") @@ -240,6 +253,7 @@ def parse_arguments(): module_phishing(command_subparsers) module_copilot(command_subparsers) module_copilot_studio(command_subparsers) + module_powerpages(command_subparsers) args = parser.parse_args() diff --git a/src/powerpwn/cli/runners.py b/src/powerpwn/cli/runners.py index 8024727..e7d19c9 100644 --- a/src/powerpwn/cli/runners.py +++ b/src/powerpwn/cli/runners.py @@ -29,6 +29,7 @@ from powerpwn.powerdump.utils.auth import Auth, acquire_token, acquire_token_from_cached_refresh_token, get_cached_tenant from powerpwn.powerdump.utils.const import API_HUB_SCOPE, POWER_APPS_SCOPE from powerpwn.powerdump.utils.path_utils import collected_data_path, entities_path +from powerpwn.powerpages.powerpages import PowerPages from powerpwn.powerphishing.app_installer import AppInstaller logger = logging.getLogger(LOGGER_NAME) @@ -225,3 +226,7 @@ def run_copilot_studio_command(args): return raise NotImplementedError("Copilot studio command has not been implemented yet.") + + +def run_powerpages_command(args): + PowerPages(args) diff --git a/src/powerpwn/main.py b/src/powerpwn/main.py index fac842b..59b9ba4 100644 --- a/src/powerpwn/main.py +++ b/src/powerpwn/main.py @@ -12,6 +12,7 @@ run_gui_command, run_nocodemalware_command, run_phishing_command, + run_powerpages_command, run_recon_command, ) @@ -52,6 +53,8 @@ def main(): run_copilot_chat_command(args) elif command == "copilot-studio-hunter": run_copilot_studio_command(args) + elif command == "powerpages": + run_powerpages_command(args) else: logger.info("Run `powerpwn --help` for available commands.") diff --git a/src/powerpwn/powerpages/powerpages.py b/src/powerpwn/powerpages/powerpages.py new file mode 100644 index 0000000..378141b --- /dev/null +++ b/src/powerpwn/powerpages/powerpages.py @@ -0,0 +1,154 @@ +import requests +import xmltodict + + +class PowerPages: + """ + A class that is responsible for the Power Pages scan + """ + + def __init__(self, args): + self.args = args + url = self.normalize_url(args.url) + self.url = url + self.scan() + + def normalize_url(self, url): + if not url.startswith("https://"): + url = "https://" + url + if url.endswith("/"): + url = url[:-1] + url = url.replace("www.", "") + return url + + def scan(self): + url = self.url + print(f"Checking `{url}`") + try: + odata_tables, api_tables = self.get_odata_tables() + if len(odata_tables) == 0: + print("You are safe!") + else: + print(f"Found {len(odata_tables)} open tables in '{url}'.") + if len(api_tables): + print(f"Examining additional {len(api_tables)} potential tables through the api") + print("\nChecking each table:") + for table in odata_tables: + try: + self.get_odata_table_data(table) + except Exception: + print(f"Can't access table {table['name']} through the odata right now") + for table in api_tables: + try: + self.get_api_table_data(table) + except Exception: + print(f"Can't access table {table['name']} through the api right now") + except Exception: + print(f"Can't access `{url}` anonymously") + + def get_odata_tables(self): + url = self.url + odata_url = f"{url}/_odata/$metadata" + headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"} + resp = requests.get(odata_url, headers=headers, timeout=10) + odata_tables = [] + api_tables = [] + if 200 <= resp.status_code <= 299: + content_xml = resp.content.decode("utf-8") + resp_json = xmltodict.parse(content_xml) + entity_types = resp_json.get("edmx:Edmx", {}).get("edmx:DataServices", {}).get("Schema", {}).get("EntityType", []) + entity_sets = ( + resp_json.get("edmx:Edmx", {}).get("edmx:DataServices", {}).get("Schema", {}).get("EntityContainer", {}).get("EntitySet", []) + ) + table_names = {entity_set["@EntityType"][4:]: entity_set["@Name"] for entity_set in entity_sets} + + for entity in entity_types: + name = table_names.get(entity.get("@Name", "unknown"), "unknown") + key = entity.get("Key", {}).get("PropertyRef", {}).get("@Name", "unknown") + columns = [prop.get("@Name", "unknown") for prop in entity.get("Property", [])] + odata_tables.append({"name": name, "key": key, "columns": columns}) + if name.endswith("Set"): + api_table_name = name.lower()[:-3] + "s" + api_tables.append({"name": api_table_name, "key": key, "columns": columns}) + + return odata_tables, api_tables + + def get_api_table_data(self, table): + url = self.url + table_name = table["name"] + table_columns = table["columns"] + api_table_url = f"{url}/_api/{table_name}?$top=1" + headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"} + resp = requests.get(api_table_url, headers=headers, timeout=10) + if 200 <= resp.status_code <= 299: + resp_json = resp.json() + if len(resp_json["value"]) > 0: + print(f"Table {table_name} is exposed with all columns through the API!") + return len(table_columns) + else: + print(f"Table {table_name} is accessible but returned no data through the API!") + return 0 + elif resp.status_code in [400, 403]: + try: + resp_json = resp.json() + error_code = resp_json.get("error", {}).get("code", "") + if not error_code == "90040101": + raise Exception("Unknown error code") + except Exception: + print(f"Table {table_name} data is safe through the API") + return + print(f"Table {table_name} data is not exposed as a whole, checking each column...") + exposed = [] + for column in table_columns: + api_column_url = f"{url}/_api/{table_name}?select={column}&$top=1" + resp = requests.get(api_column_url, headers=headers, timeout=5) + if 200 <= resp.status_code <= 299: + resp_json = resp.json() + if len(resp_json["value"]) > 0: + exposed.append(column) + if len(exposed): + print(f"Table {table_name} has the following columns exposed: {', '.join(exposed)} through the API") + return len(exposed) + else: + print(f"Table {table_name} data is safe through the API") + return 0 + + def get_odata_table_data(self, table): + url = self.url + table_name = table["name"] + table_columns = table["columns"] + table_url = f"{url}/_odata/{table_name}?$top=1" + headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"} + resp = requests.get(table_url, headers=headers, timeout=10) + if 200 <= resp.status_code <= 299: + resp_json = resp.json() + if len(resp_json["value"]) > 0: + print(f"Table {table_name} is exposed with all columns through the odata!") + return len(table_columns) + else: + print(f"Table {table_name} is accessible but returned no data through the odata!") + return 0 + elif resp.status_code in [400, 403]: + try: + resp_json = resp.json() + error_code = resp_json.get("error", {}).get("code", "") + if not error_code == "90040101": + raise Exception("Unknown error code") + except Exception: + print(f"Table {table_name} data is safe through the odata") + return + print(f"Table {table_name} data is not exposed as a whole, checking each column...") + exposed = [] + for column in table_columns: + column_url = f"{url}/_odata/{table_name}?select={column}&$top=1" + resp = requests.get(column_url, headers=headers, timeout=5) + if 200 <= resp.status_code <= 299: + resp_json = resp.json() + if len(resp_json["value"]) > 0: + exposed.append(column) + if len(exposed): + print(f"Table {table_name} has the following columns exposed: {', '.join(exposed)} through the odata") + return len(exposed) + else: + print(f"Table {table_name} data is safe through the odata") + return 0