-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathokta_auth.py
108 lines (89 loc) · 3.54 KB
/
okta_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Call Okta APIs using Session (cookies) and xsrfToken -- just like a browser. SSWS API Token is not needed.
### CAUTION: Code to call private APIs is not supported by Okta. The APIs can change at any time.
### Test in a dev or oktapreview tenant before trying this in production.
This will work with users who have Push MFA. This won't work if Apps > Apps > "Okta Admin Console" > Sign On Policy > MFA is enabled.
"""
import requests
import getpass
import time
import re
# Set these:
okta_url = 'https://ORG.okta.com'
username = '...'
okta_admin_url = okta_url.replace('.', '-admin.', 1)
# A session uses cookies and reuses the HTTP connection.
session = requests.Session()
def main():
sign_in()
user = get_user('me')
admin_xsrf_token = admin_sign_in()
send_notification(admin_xsrf_token, user['id'])
get_user_factors(user['id'])
def sign_in():
print('Okta URL:', okta_url)
print('Username:', username)
password = getpass.getpass()
print('Signing in...')
response = session.post(okta_url + '/api/v1/authn', json={'username': username, 'password': password})
authn = response.json()
if not response.ok:
print(authn['errorSummary'])
exit()
if authn['status'] == 'MFA_REQUIRED':
token = send_push(authn['_embedded']['factors'], authn['stateToken'])
else:
token = authn['sessionToken']
session.get(okta_url + '/login/sessionCookie?token=' + token)
def send_push(factors, state_token):
print('Push MFA...')
push_factors = [f for f in factors if f['factorType'] == 'push']
if not push_factors:
print('Push factor not found')
exit()
push_url = push_factors[0]['_links']['verify']['href']
while True:
authn = session.post(push_url, json={'stateToken': state_token}).json()
if authn['status'] == 'SUCCESS':
return authn['sessionToken']
result = authn['factorResult']
if result == 'WAITING':
time.sleep(4) # 4 seconds
print('Waiting...')
elif result in ['REJECTED', 'TIMEOUT']:
print('Push rejected')
exit()
def admin_sign_in():
response = session.get(okta_url + '/home/admin-entry')
# old style
match = re.search(r'"token":\["(.*)"\]', response.text)
if match:
body = {'token': match.group(1)}
response = session.post(okta_admin_url + '/admin/sso/request', data=body)
# new style (w/ new OIDC flow)
match = re.search(r'<span.* id="_xsrfToken">(.*)</span>', response.text)
if not match:
print('admin_sign_in: token not found. Go to Apps > Apps > "Okta Admin Console" > Sign On Policy and disable MFA.')
exit()
admin_xsrf_token = match.group(1)
return admin_xsrf_token
def send_notification(admin_xsrf_token, userid):
body = {
'message': 'testing 1125',
'target': 'GROUPS_AND_USERS', # or EVERYONE
'users': [userid],
'groups': []
}
headers = {'X-Okta-XsrfToken': admin_xsrf_token}
notification = session.post(okta_admin_url + '/api/internal/admin/notification', json=body, headers=headers).json()
print('\nNotification:')
print(notification)
def get_user(userid):
user = session.get(okta_url + '/api/v1/users/' + userid).json()
print('\nUser:')
print(user)
return user
def get_user_factors(userid):
factors = session.get(okta_url +'/api/v1/users/' + userid + '/factors').json()
print('\nFactors:')
print(factors)
main()