Skip to content

Commit

Permalink
Merge pull request #415 from Silverfort/master
Browse files Browse the repository at this point in the history
Silverfort support for aws-adfs authentications
  • Loading branch information
pdecat authored Oct 22, 2023
2 parents 693d24d + 13efdb2 commit cf545cd
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ aws-adfs integrates with:
* OTP 6 digit codes
* SMS codes
* Phone call
* [Silverfort](https://www.silverfort.com/) MFA provider

## Setup Dependencies

Expand Down
65 changes: 65 additions & 0 deletions aws_adfs/_silverfort_authenticator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import time
import logging

import click
import lxml.etree as ET
from . import roles_assertion_extractor
from .helpers import trace_http_request

NOT_AUTHORIZED = 'Not authorized'
MFA_TIMEOUT = 'MFA request expired'


def extract(html_response, ssl_verification_enabled, session):
return _retrieve_roles_page(html_response, session, ssl_verification_enabled)


def _retrieve_roles_page(html_response, session, ssl_verification_enabled):
try:
response = session.post(
_action_url_on_validation_success(html_response),
verify=ssl_verification_enabled,
allow_redirects=True,
data={
'AuthMethod': 'SilverfortAdfs',
'Context': _context(html_response),
}
)
trace_http_request(response)
if response.status_code != 200:
raise click.ClickException(
'Issues during redirection to aws roles page. The error response {}'.format(response))
html_response = ET.fromstring(response.text, ET.HTMLParser())
element = html_response.find('.//input[@name="SAMLResponse"]')
if element is not None:
logging.debug("Successfully retrieved user SAML token")
# Save session cookies to avoid having to repeat MFA on each login
session.cookies.save(ignore_discard=True)
html_response = ET.fromstring(response.text, ET.HTMLParser())
return roles_assertion_extractor.extract(html_response)

except Exception as e:
error_message = 'Encountered an error while trying to retrieve AWS roles page'
logging.exception(error_message)
raise click.ClickException(f"{error_message}: {e}")

if NOT_AUTHORIZED in response.text:
raise click.ClickException("Access is denied. User is not authorized")

if MFA_TIMEOUT in response.text:
raise click.ClickException("Access is denied. Timeout waiting for MFA approval")

raise click.ClickException(f"Received an unexpected response: {response}. Returning")


def _context(html_response):
context_query = './/input[@id="context"]'
element = html_response.find(context_query)
return element.get('value')


def _action_url_on_validation_success(html_response):
post_url_query = './/form[@id="options"]'
element = html_response.find(post_url_query)

return element.get('action')
22 changes: 21 additions & 1 deletion aws_adfs/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from . import _symantec_vip_access as symantec_vip_access
from . import _azure_mfa_authenticator as azure_mfa_auth
from . import _azure_cloud_mfa_authenticator as azure_cloud_mfa_auth
from . import _silverfort_authenticator as silverfort_mfa_auth
from . import html_roles_fetcher
from . import roles_assertion_extractor
from .helpers import trace_http_request


def authenticate(config, username=None, password=None, assertfile=None, aad_verification_code=None):
def authenticate(config, username=None, password=None, assertfile=None):
response, session = html_roles_fetcher.fetch_html_encoded_roles(
adfs_host=config.adfs_host,
adfs_cookie_location=config.adfs_cookie_location,
Expand Down Expand Up @@ -123,6 +124,11 @@ def extract():
return azure_cloud_mfa_auth.extract(html_response, config.ssl_verification, config.aad_verification_code, session)
return extract

def _silverfort_extractor():
def extract():
return silverfort_mfa_auth.extract(html_response, config.ssl_verification, session)
return extract

if assertfile is None:
chosen_strategy = _plain_extractor
else:
Expand All @@ -140,6 +146,8 @@ def extract():
chosen_strategy = _azure_mfa_extractor
elif _is_azure_cloud_mfa_authentication(html_response):
chosen_strategy = _azure_cloud_mfa_extractor
elif _is_silverfort_mfa_authentication(html_response):
chosen_strategy = _silverfort_extractor

return chosen_strategy()

Expand Down Expand Up @@ -172,6 +180,7 @@ def _is_symantec_vip_authentication(html_response):
):
return True


def _is_rsa_authentication(html_response):
auth_method = './/input[@id="authMethod"]'
element = html_response.find(auth_method)
Expand All @@ -180,6 +189,7 @@ def _is_rsa_authentication(html_response):
and element.get('value') == 'SecurIDAuthentication'
)


def _is_azure_mfa_authentication(html_response):
auth_method = './/input[@id="authMethod"]'
element = html_response.find(auth_method)
Expand All @@ -188,10 +198,20 @@ def _is_azure_mfa_authentication(html_response):
and element.get('value') == 'AzureMfaServerAuthentication'
)


def _is_azure_cloud_mfa_authentication(html_response):
auth_method = './/input[@id="authMethod"]'
element = html_response.find(auth_method)
return (
element is not None
and element.get('value') == 'AzureMfaAuthentication'
)


def _is_silverfort_mfa_authentication(html_response):
auth_method = './/input[@id="authMethod"]'
element = html_response.find(auth_method)
return (
element is not None
and element.get('value') == 'SilverfortAdfs'
)

0 comments on commit cf545cd

Please sign in to comment.