Skip to content

Commit

Permalink
Merge pull request #83 from mbrg/feature/anonymous_powerpages_check
Browse files Browse the repository at this point in the history
power pages module
  • Loading branch information
ofrin-zen authored Nov 15, 2024
2 parents 1fa2420 + 12ad1b4 commit c02b0c8
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 1 deletion.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ typing_extensions<4.6.0
pyjwt~=2.6.0
websockets~=12.0.0
pandas
xlsxwriter~=3.2.0
xlsxwriter~=3.2.0
xmltodict
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ install_requires =
websockets >= 12.0.0
pandas
xlsxwriter >= 3.2.0
xmltodict

package_dir =
= src
Expand Down
14 changes: 14 additions & 0 deletions src/powerpwn/cli/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,19 @@ def copilot_studio_modules(parser: argparse.ArgumentParser, module: str):
parser.add_argument("-t", "--timeout", help="The timeout for the enumeration process to have (in seconds)", default=300)


def module_powerpages(parser: argparse.ArgumentParser):
powerpages = parser.add_parser(
"powerpages",
description="Test anonymous access to dataverse tables via power pages, either via the apis or odata feeds.",
help="Test anonymous access to dataverse tables via power pages, either via the apis or odata feeds.",
)
powerpages.add_argument(
"-url",
help="The url of the power pages domain to be tested. Format the url as such: 'https://<your_domain>.powerappsportals.com'",
required=True,
)


def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--log-level", default=logging.INFO, type=lambda x: getattr(logging, x), help="Configure the logging level.")
Expand All @@ -240,6 +253,7 @@ def parse_arguments():
module_phishing(command_subparsers)
module_copilot(command_subparsers)
module_copilot_studio(command_subparsers)
module_powerpages(command_subparsers)

args = parser.parse_args()

Expand Down
5 changes: 5 additions & 0 deletions src/powerpwn/cli/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from powerpwn.powerdump.utils.auth import Auth, acquire_token, acquire_token_from_cached_refresh_token, get_cached_tenant
from powerpwn.powerdump.utils.const import API_HUB_SCOPE, POWER_APPS_SCOPE
from powerpwn.powerdump.utils.path_utils import collected_data_path, entities_path
from powerpwn.powerpages.powerpages import PowerPages
from powerpwn.powerphishing.app_installer import AppInstaller

logger = logging.getLogger(LOGGER_NAME)
Expand Down Expand Up @@ -225,3 +226,7 @@ def run_copilot_studio_command(args):
return

raise NotImplementedError("Copilot studio command has not been implemented yet.")


def run_powerpages_command(args):
PowerPages(args)
3 changes: 3 additions & 0 deletions src/powerpwn/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
run_gui_command,
run_nocodemalware_command,
run_phishing_command,
run_powerpages_command,
run_recon_command,
)

Expand Down Expand Up @@ -52,6 +53,8 @@ def main():
run_copilot_chat_command(args)
elif command == "copilot-studio-hunter":
run_copilot_studio_command(args)
elif command == "powerpages":
run_powerpages_command(args)
else:
logger.info("Run `powerpwn --help` for available commands.")

Expand Down
154 changes: 154 additions & 0 deletions src/powerpwn/powerpages/powerpages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import requests
import xmltodict


class PowerPages:
"""
A class that is responsible for the Power Pages scan
"""

def __init__(self, args):
self.args = args
url = self.normalize_url(args.url)
self.url = url
self.scan()

def normalize_url(self, url):
if not url.startswith("https://"):
url = "https://" + url
if url.endswith("/"):
url = url[:-1]
url = url.replace("www.", "")
return url

def scan(self):
url = self.url
print(f"Checking `{url}`")
try:
odata_tables, api_tables = self.get_odata_tables()
if len(odata_tables) == 0:
print("You are safe!")
else:
print(f"Found {len(odata_tables)} open tables in '{url}'.")
if len(api_tables):
print(f"Examining additional {len(api_tables)} potential tables through the api")
print("\nChecking each table:")
for table in odata_tables:
try:
self.get_odata_table_data(table)
except Exception:
print(f"Can't access table {table['name']} through the odata right now")
for table in api_tables:
try:
self.get_api_table_data(table)
except Exception:
print(f"Can't access table {table['name']} through the api right now")
except Exception:
print(f"Can't access `{url}` anonymously")

def get_odata_tables(self):
url = self.url
odata_url = f"{url}/_odata/$metadata"
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"}
resp = requests.get(odata_url, headers=headers, timeout=10)
odata_tables = []
api_tables = []
if 200 <= resp.status_code <= 299:
content_xml = resp.content.decode("utf-8")
resp_json = xmltodict.parse(content_xml)
entity_types = resp_json.get("edmx:Edmx", {}).get("edmx:DataServices", {}).get("Schema", {}).get("EntityType", [])
entity_sets = (
resp_json.get("edmx:Edmx", {}).get("edmx:DataServices", {}).get("Schema", {}).get("EntityContainer", {}).get("EntitySet", [])
)
table_names = {entity_set["@EntityType"][4:]: entity_set["@Name"] for entity_set in entity_sets}

for entity in entity_types:
name = table_names.get(entity.get("@Name", "unknown"), "unknown")
key = entity.get("Key", {}).get("PropertyRef", {}).get("@Name", "unknown")
columns = [prop.get("@Name", "unknown") for prop in entity.get("Property", [])]
odata_tables.append({"name": name, "key": key, "columns": columns})
if name.endswith("Set"):
api_table_name = name.lower()[:-3] + "s"
api_tables.append({"name": api_table_name, "key": key, "columns": columns})

return odata_tables, api_tables

def get_api_table_data(self, table):
url = self.url
table_name = table["name"]
table_columns = table["columns"]
api_table_url = f"{url}/_api/{table_name}?$top=1"
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"}
resp = requests.get(api_table_url, headers=headers, timeout=10)
if 200 <= resp.status_code <= 299:
resp_json = resp.json()
if len(resp_json["value"]) > 0:
print(f"Table {table_name} is exposed with all columns through the API!")
return len(table_columns)
else:
print(f"Table {table_name} is accessible but returned no data through the API!")
return 0
elif resp.status_code in [400, 403]:
try:
resp_json = resp.json()
error_code = resp_json.get("error", {}).get("code", "")
if not error_code == "90040101":
raise Exception("Unknown error code")
except Exception:
print(f"Table {table_name} data is safe through the API")
return
print(f"Table {table_name} data is not exposed as a whole, checking each column...")
exposed = []
for column in table_columns:
api_column_url = f"{url}/_api/{table_name}?select={column}&$top=1"
resp = requests.get(api_column_url, headers=headers, timeout=5)
if 200 <= resp.status_code <= 299:
resp_json = resp.json()
if len(resp_json["value"]) > 0:
exposed.append(column)
if len(exposed):
print(f"Table {table_name} has the following columns exposed: {', '.join(exposed)} through the API")
return len(exposed)
else:
print(f"Table {table_name} data is safe through the API")
return 0

def get_odata_table_data(self, table):
url = self.url
table_name = table["name"]
table_columns = table["columns"]
table_url = f"{url}/_odata/{table_name}?$top=1"
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"}
resp = requests.get(table_url, headers=headers, timeout=10)
if 200 <= resp.status_code <= 299:
resp_json = resp.json()
if len(resp_json["value"]) > 0:
print(f"Table {table_name} is exposed with all columns through the odata!")
return len(table_columns)
else:
print(f"Table {table_name} is accessible but returned no data through the odata!")
return 0
elif resp.status_code in [400, 403]:
try:
resp_json = resp.json()
error_code = resp_json.get("error", {}).get("code", "")
if not error_code == "90040101":
raise Exception("Unknown error code")
except Exception:
print(f"Table {table_name} data is safe through the odata")
return
print(f"Table {table_name} data is not exposed as a whole, checking each column...")
exposed = []
for column in table_columns:
column_url = f"{url}/_odata/{table_name}?select={column}&$top=1"
resp = requests.get(column_url, headers=headers, timeout=5)
if 200 <= resp.status_code <= 299:
resp_json = resp.json()
if len(resp_json["value"]) > 0:
exposed.append(column)
if len(exposed):
print(f"Table {table_name} has the following columns exposed: {', '.join(exposed)} through the odata")
return len(exposed)
else:
print(f"Table {table_name} data is safe through the odata")
return 0

0 comments on commit c02b0c8

Please sign in to comment.