From dc0628bd28e09cefa954a86f6272522b985f991a Mon Sep 17 00:00:00 2001 From: guyn-sf Date: Thu, 20 Jul 2023 15:06:12 +0300 Subject: [PATCH] Silverfort support for aws-adfs authentications Signed-off-by: guyn-sf --- aws_adfs/_silverfort_authenticator.py | 65 +++++++++++++++++++++++++++ aws_adfs/authenticator.py | 16 +++++++ 2 files changed, 81 insertions(+) create mode 100644 aws_adfs/_silverfort_authenticator.py diff --git a/aws_adfs/_silverfort_authenticator.py b/aws_adfs/_silverfort_authenticator.py new file mode 100644 index 0000000..d78b158 --- /dev/null +++ b/aws_adfs/_silverfort_authenticator.py @@ -0,0 +1,65 @@ +import time +import logging + +import click +import lxml.etree as ET +from . import roles_assertion_extractor +from .helpers import trace_http_request + +NOT_AUTHORIZED = 'Not authorized' +MFA_TIMEOUT = 'MFA request expired' + + +def extract(html_response, ssl_verification_enabled, session): + return _retrieve_roles_page(html_response, session, ssl_verification_enabled) + + +def _retrieve_roles_page(html_response, session, ssl_verification_enabled): + try: + response = session.post( + _action_url_on_validation_success(html_response), + verify=ssl_verification_enabled, + allow_redirects=True, + data={ + 'AuthMethod': 'SilverfortAdfs', + 'Context': _context(html_response), + } + ) + trace_http_request(response) + if response.status_code != 200: + raise click.ClickException( + 'Issues during redirection to aws roles page. The error response {}'.format(response)) + html_response = ET.fromstring(response.text, ET.HTMLParser()) + element = html_response.find('.//input[@name="SAMLResponse"]') + if element is not None: + logging.debug("Successfully retrieved user SAML token") + # Save session cookies to avoid having to repeat MFA on each login + session.cookies.save(ignore_discard=True) + html_response = ET.fromstring(response.text, ET.HTMLParser()) + return roles_assertion_extractor.extract(html_response) + + except Exception as e: + error_message = 'Encountered an error while trying to retrieve AWS roles page' + logging.exception(error_message) + raise click.ClickException(f"{error_message}: {e}") + + if NOT_AUTHORIZED in response.text: + raise click.ClickException("Access is denied, User is not authorized") + + if MFA_TIMEOUT in response.text: + raise click.ClickException("Access is denied, Timeout waiting for MFA approval") + + raise click.ClickException(f"Received an unexpected response: {response}. Returning") + + +def _context(html_response): + context_query = './/input[@id="context"]' + element = html_response.find(context_query) + return element.get('value') + + +def _action_url_on_validation_success(html_response): + post_url_query = './/form[@id="options"]' + element = html_response.find(post_url_query) + + return element.get('action') diff --git a/aws_adfs/authenticator.py b/aws_adfs/authenticator.py index 3f4eb22..2bd40f6 100644 --- a/aws_adfs/authenticator.py +++ b/aws_adfs/authenticator.py @@ -8,6 +8,7 @@ from . import _symantec_vip_access as symantec_vip_access from . import _azure_mfa_authenticator as azure_mfa_auth from . import _azure_cloud_mfa_authenticator as azure_cloud_mfa_auth +from . import _silverfort_authenticator as silverfort_mfa_auth from . import html_roles_fetcher from . import roles_assertion_extractor from .helpers import trace_http_request @@ -123,6 +124,11 @@ def extract(): return azure_cloud_mfa_auth.extract(html_response, config.ssl_verification, config.aad_verification_code, session) return extract + def _silverfort_extractor(): + def extract(): + return silverfort_mfa_auth.extract(html_response, config.ssl_verification, session) + return extract + if assertfile is None: chosen_strategy = _plain_extractor else: @@ -140,6 +146,8 @@ def extract(): chosen_strategy = _azure_mfa_extractor elif _is_azure_cloud_mfa_authentication(html_response): chosen_strategy = _azure_cloud_mfa_extractor + elif _is_silverfort_mfa_authentication(html_response): + chosen_strategy = _silverfort_extractor return chosen_strategy() @@ -195,3 +203,11 @@ def _is_azure_cloud_mfa_authentication(html_response): element is not None and element.get('value') == 'AzureMfaAuthentication' ) + +def _is_silverfort_mfa_authentication(html_response): + auth_method = './/input[@id="authMethod"]' + element = html_response.find(auth_method) + return ( + element is not None + and element.get('value') == 'SilverfortAdfs' + )