Skip to content
This repository has been archived by the owner on Nov 14, 2021. It is now read-only.

Commit

Permalink
Merge pull request #20 from the-scouts/login-refactor
Browse files Browse the repository at this point in the history
Login refactor
  • Loading branch information
AA-Turner authored Feb 6, 2021
2 parents c815eb4 + 5b348ce commit 7c5700d
Showing 1 changed file with 99 additions and 98 deletions.
197 changes: 99 additions & 98 deletions compass/core/logon.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import time
from typing import Literal, Optional
from typing import Literal

import certifi
from lxml import html
Expand All @@ -18,9 +18,32 @@
from compass.core.utility import setup_tls_certs

TYPES_UNIT_LEVELS = Literal["Group", "District", "County", "Region", "Country", "Organisation"]
TYPES_STO = Literal[None, "0", "5", "X"]


class Logon(InterfaceBase):
"""Create connection to Compass and authenticate. Holds session state.
Logon flow is:
1. Create a persistent state object (Session) to hold headers, cookies etc.
a. Check and verify that TLS certificates for Compass have been set up.
b. Get ASP.NET session cookie from Compass. (HTTP request #1)
2. Post login data to Compass. (HTTP request #2)
3. Get sample page from Compass. (HTTP request #3)
4. Verify login was successful
a. Create dict of compass internal variables, e.g. Master.User.CN (POST_CTRL).
b. Create dict of user's role title to internal role number
c. Update data for authorisation headers for requests to Compass.
IF Role title specified:
5. Post new role number to Compass (HTTP request #4)
6. Get sample page from Compass. (HTTP request #5)
7. Get currently active role number, and check this equals the requested role number.
a. Create dict of compass internal variables, e.g. Master.User.CN (POST_CTRL).
b. Create dict of user's role title to internal role number
c. Update data for authorisation headers for requests to Compass.
"""

def __init__(self, credentials: tuple[str, str], role_to_use: str = None):
"""Constructor for Logon."""
self._member_role_number = 0
Expand All @@ -29,14 +52,23 @@ def __init__(self, credentials: tuple[str, str], role_to_use: str = None):
self.role_to_use: str = role_to_use

self.current_role: str = ""
self.current_role_number: int = 0
self.roles_dict: dict = {}

super().__init__(self._do_logon(credentials, role_to_use))
# Create session
super().__init__(self._create_session())

# Log in and try to confirm success
self._logon_remote(credentials)

if role_to_use is not None:
# Session contains updated auth headers from role change
self.change_role(role_to_use)

self.sto_thread = PeriodicTimer(150, self._extend_session_timeout)
# self.sto_thread.start()

# properties/accessors code:

@property
def mrn(self) -> int:
return self.compass_dict["Master.User.MRN"] # Member Role Number
Expand Down Expand Up @@ -70,7 +102,9 @@ def hierarchy(self) -> schemas.hierarchy.HierarchyLevel:

return schemas.hierarchy.HierarchyLevel(id=unit_number, level=level_map[unit_level])

def _get(self, url: str, auth_header: bool = False, session: Optional[requests.Session] = None, **kwargs) -> requests.Response:
# _get override code:

def _get(self, url: str, auth_header: bool = False, **kwargs) -> requests.Response:
"""Override get method with custom auth_header logic."""
# pylint: disable=arguments-differ
if auth_header:
Expand All @@ -83,10 +117,7 @@ def _get(self, url: str, auth_header: bool = False, session: Optional[requests.S
kwargs["headers"] = {**kwargs.get("headers", {}), **headers}
kwargs["params"] = {**kwargs.get("params", {}), **params}

if session is not None:
return session.get(url, **kwargs)
else:
return super(Logon, self)._get(url, **kwargs)
return super(Logon, self)._get(url, **kwargs)

def _jk_hash(self) -> str:
"""Generate JK Hash needed by Compass."""
Expand All @@ -98,23 +129,14 @@ def _jk_hash(self) -> str:
self._post(f"{Settings.base_url}/System/Preflight", json=data)
return key_hash

def _extend_session_timeout(self, sto: Literal[None, "0", "5", "X"] = "0"):
# Timeout code:

def _extend_session_timeout(self, sto: TYPES_STO = "0"):
# Session time out. 4 values: None (normal), 0 (STO prompt) 5 (Extension, arbitrary constant) X (Hard limit)
logger.debug(f"Extending session length {datetime.datetime.now()}")
return self._get(f"{Settings.web_service_path}/STO_CHK", auth_header=True, params={"pExtend": sto})

def _do_logon(self, credentials: tuple[str, str] = None, role_to_use: str = None) -> requests.Session:
"""Log in to Compass, change role and confirm success."""
session = self._create_session()

# Log in and try to confirm success
self._logon_remote(credentials, session)
self._confirm_login_success(session)

# Session contains updated auth headers from role change
session = self.change_role(role_to_use, session=session)

return session
# Core login code:

@staticmethod
def _create_session() -> requests.Session:
Expand All @@ -138,9 +160,8 @@ def _create_session() -> requests.Session:

return session

@staticmethod
def _logon_remote(auth: tuple[str, str], session: requests.Session) -> requests.Response:
"""Interface code with Compass."""
def _logon_remote(self, auth: tuple[str, str]) -> requests.Response:
"""Log in to Compass and confirm success."""
# Referer is genuinely needed otherwise login doesn't work
headers = {"Referer": f"{Settings.base_url}/login/User/Login"}

Expand All @@ -153,47 +174,61 @@ def _logon_remote(auth: tuple[str, str], session: requests.Session) -> requests.

# log in
logger.info("Logging in")
response = session.post(f"{Settings.base_url}/Login.ashx", headers=headers, data=credentials)
response = self._post(f"{Settings.base_url}/Login.ashx", headers=headers, data=credentials)

# verify log in was successful
self._verify_success_update_properties()

return response

def change_role(self, new_role: Optional[str], session: Optional[requests.Session] = None) -> requests.Session:
"""Update role information."""
if session is None:
try:
session = self.s
except AttributeError:
raise ValueError("No session! session object must be passed or self.s set.") from None

if new_role is not None:
logger.info("Changing role")
new_role = new_role.strip()

# Change role to the specified role number
member_role_number = {v: k for k, v in self.roles_dict.items()}[new_role]
session.post(f"{Settings.base_url}/API/ChangeRole", json={"MRN": str(member_role_number)})
else:
logger.info("not changing role")
member_role_number = self.current_role_number
def _verify_success_update_properties(self, check_role_number: int = None) -> None:
"""Confirms success and updates authorisation."""
# Test 'get' for an exemplar page that needs authorisation.
portal_url = f"{Settings.base_url}/MemberProfile.aspx?Page=ROLES&TAB"
response = self._get(portal_url)

# Confirm Compass is reporting the changed role number, update auth headers
form_tree = self._confirm_role_change(session, member_role_number)
session = self._update_authorisation(form_tree, session)
# # Response body is login page for failure (~8Kb), but success is a 300 byte page.
# if int(post_response.headers.get("content-length", 901)) > 900:
# raise CompassAuthenticationError("Login has failed")
# Naive check for error, Compass redirects to an error page when something goes wrong
# TODO what is the error page URL - what do we expect? From memory Error.aspx
if response.url != portal_url:
raise CompassAuthenticationError("Login has failed")

if member_role_number != self.mrn:
raise CompassAuthenticationError("Compass Authentication failed to update")
# Create lxml html.FormElement
form = html.fromstring(response.content).forms[0]

logger.info(f"Role updated successfully! Role is now {self.current_role}.")
# Update session dicts with new role
self.compass_dict = self._create_compass_dict(form) # Updates MRN property etc.
self.roles_dict = self._create_roles_dict(form)

return session
# Set auth headers for new role
auth_headers = {
"Authorization": f"{self.cn}~{self.mrn}",
"SID": self.compass_dict["Master.Sys.SessionID"], # Session ID
}
self._update_headers(auth_headers)

# Update current role properties
self.current_role = self.roles_dict[self.mrn]
location = next(row[2].text_content() for row in form.xpath("//tbody/tr") if int(row.get("data-pk")) == self.mrn)
logger.debug(f"Using Role: {self.current_role} ({location.strip()})")

# Verify role number against test value
if check_role_number is not None:
logger.debug("Confirming role has been changed")
# Check that the role has been changed to the desired role. If not, raise exception.
if check_role_number != self.mrn:
raise CompassAuthenticationError("Role failed to update in Compass")

@staticmethod
def _create_compass_dict(form_tree: html.FormElement) -> dict:
"""Create Compass info dict from FormElement."""
compass_dict = {}
compass_vars = form_tree.fields["ctl00$_POST_CTRL"]
for pair in compass_vars.split("~"):
key, value, *_ = pair.split("#")
compass_dict[key] = cast(value)
key, value = pair.split("#", 1)
compass_dict[key] = cast(value) # int or str

return compass_dict

Expand All @@ -203,53 +238,19 @@ def _create_roles_dict(form_tree: html.FormElement) -> dict:
roles_selector = form_tree.inputs["ctl00$UserTitleMenu$cboUCRoles"] # get roles from compass page (list of option tags)
return {int(role.get("value")): role.text.strip() for role in roles_selector}

@staticmethod
def _get_active_role_number(form_tree: html.FormElement) -> int:
"""Gets active (selected) role from FormElement."""
return int(form_tree.inputs["ctl00$UserTitleMenu$cboUCRoles"].value)

def _confirm_login_success(self, session: requests.Session) -> None:
"""Confirms success and updates authorisation."""
portal_url = f"{Settings.base_url}/ScoutsPortal.aspx"
response = self._get(portal_url, session=session)
def change_role(self, new_role: str) -> None:
"""Update role information.
# # Response body is login page for failure (~8Kb), but success is a 300 byte page.
# if int(post_response.headers.get("content-length", 901)) > 900:
# raise CompassAuthenticationError("Login has failed")
if response.url != portal_url:
raise CompassAuthenticationError("Login has failed")
If the user has multiple roles with the same role title, the first is used.
"""
logger.info("Changing role")

form = html.fromstring(response.content).forms[0]
self._update_authorisation(form, session)
# Change role to the specified role number
member_role_number = next(num for num, name in self.roles_dict.items() if name == new_role.strip())
response = self._post(f"{Settings.base_url}/API/ChangeRole", json={"MRN": member_role_number}) # b"false"
logger.debug(f"Compass ChangeRole call returned: {response.json()}")

def _confirm_role_change(self, session: requests.Session, check_role_number: int) -> html.FormElement:
"""Confirms success and updates authorisation."""
portal_url = f"{Settings.base_url}/ScoutsPortal.aspx"
response = self._get(portal_url, session=session)
form = html.fromstring(response.content).forms[0]

logger.debug("Confirming role has been changed")
# Check that the role has been changed to the desired role. If not, raise exception.
if self._get_active_role_number(form) != check_role_number:
raise CompassError("Role failed to update in Compass")

return form

def _update_authorisation(self, form: html.FormElement, session: requests.Session) -> requests.Session:
"""Update authorisation data."""
self.compass_dict = self._create_compass_dict(form) # Updates MRN property etc.
self.roles_dict = self._create_roles_dict(form)

# Set auth headers for new role
auth_headers = {
"Authorization": f"{self.cn}~{self.mrn}",
"SID": self.compass_dict["Master.Sys.SessionID"], # Session ID
}
session.headers.update(auth_headers)

# TODO is this get role bit needed given that we change the role?
self.current_role = self.roles_dict[int(self.mrn)]
self.current_role_number = self._get_active_role_number(form)
logger.debug(f"Using Role: {self.current_role}")
# Confirm Compass is reporting the changed role number, update auth headers
self._verify_success_update_properties(check_role_number=member_role_number)

return session
logger.info(f"Role updated successfully! Role is now {self.current_role}.")

0 comments on commit 7c5700d

Please sign in to comment.