-
Notifications
You must be signed in to change notification settings - Fork 43
/
flask_fas_openid.py
362 lines (319 loc) · 14.4 KB
/
flask_fas_openid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# -*- coding: utf-8 -*-
# Flask-FAS-OpenID - A Flask extension for authorizing users with FAS-OpenID
#
# Primary maintainer: Patrick Uiterwijk <puiterwijk@fedoraproject.org>
#
# Copyright (c) 2013, Patrick Uiterwijk
# This file is part of python-fedora
#
# python-fedora is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# python-fedora is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with python-fedora; if not, see <http://www.gnu.org/licenses/>
'''
FAS-OpenID authentication plugin for the flask web framework
.. moduleauthor:: Patrick Uiterwijk <puiterwijk@fedoraproject.org>
..versionadded:: 0.3.33
'''
from functools import wraps
import logging
import time
from munch import Munch
import flask
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
from openid.consumer import consumer
from openid.fetchers import setDefaultFetcher, Urllib2Fetcher
from openid.extensions import pape, sreg, ax
from openid_cla import cla
from openid_teams import teams
import six
log = logging.getLogger(__name__)
# http://flask.pocoo.org/snippets/45/
def request_wants_json():
''' Return wether the user requested the data in JSON or not. '''
best = flask.request.accept_mimetypes \
.best_match(['application/json', 'text/html'])
return best == 'application/json' and \
flask.request.accept_mimetypes[best] > \
flask.request.accept_mimetypes['text/html']
class FASJSONEncoder(flask.json.JSONEncoder):
""" Dedicated JSON encoder for the FAS openid information. """
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a ``TypeError``).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, (set, frozenset)):
return list(o)
return flask.json.JSONEncoder.default(self, o)
class FAS(object):
""" The Flask plugin. """
def __init__(self, app=None):
self.postlogin_func = None
self.app = app
if self.app is not None:
self.init_app(app)
def init_app(self, app):
""" Constructor for the Flask application. """
self.app = app
app.config.setdefault('FAS_OPENID_ENDPOINT',
'https://id.fedoraproject.org/openid/')
app.config.setdefault('FAS_OPENID_CHECK_CERT', True)
if not self.app.config['FAS_OPENID_CHECK_CERT']:
setDefaultFetcher(Urllib2Fetcher())
# json_encoder is only available from flask 0.10
version = flask.__version__.split('.')
assume_recent = False
try:
major = int(version[0])
minor = int(version[1])
except ValueError:
# We'll assume we're using a recent enough flask as the packages
# of old versions used sane version numbers.
assume_recent = True
if assume_recent or (major > 0 or minor >= 10):
self.app.json_encoder = FASJSONEncoder
@app.route('/_flask_fas_openid_handler/', methods=['GET', 'POST'])
def flask_fas_openid_handler():
""" Endpoint for OpenID results. """
return self._handle_openid_request()
app.before_request(self._check_session)
def postlogin(self, f):
"""Marks a function as post login handler. This decorator calls your
function after the login has been performed.
"""
self.postlogin_func = f
return f
def _handle_openid_request(self):
return_url = flask.session.get('FLASK_FAS_OPENID_RETURN_URL', None)
cancel_url = flask.session.get('FLASK_FAS_OPENID_CANCEL_URL', None)
base_url = self.normalize_url(flask.request.base_url)
oidconsumer = consumer.Consumer(flask.session, None)
info = oidconsumer.complete(flask.request.values, base_url)
display_identifier = info.getDisplayIdentifier()
if info.status == consumer.FAILURE and display_identifier:
return 'FAILURE. display_identifier: %s' % display_identifier
elif info.status == consumer.CANCEL:
if cancel_url:
return flask.redirect(cancel_url)
return 'OpenID request was cancelled'
elif info.status == consumer.SUCCESS:
if info.endpoint.server_url != \
self.app.config['FAS_OPENID_ENDPOINT']:
log.warn('Claim received from invalid issuer: %s',
info.endpoint.server_url)
return 'Invalid provider issued claim!'
sreg_resp = sreg.SRegResponse.fromSuccessResponse(info)
teams_resp = teams.TeamsResponse.fromSuccessResponse(info)
cla_resp = cla.CLAResponse.fromSuccessResponse(info)
ax_resp = ax.FetchResponse.fromSuccessResponse(info)
user = {'fullname': '', 'username': '', 'email': '',
'timezone': '', 'cla_done': False, 'groups': []}
if not sreg_resp:
# If we have no basic info, be gone with them!
return flask.redirect(cancel_url)
user['username'] = sreg_resp.get('nickname')
user['fullname'] = sreg_resp.get('fullname')
user['email'] = sreg_resp.get('email')
user['timezone'] = sreg_resp.get('timezone')
user['login_time'] = time.time()
if cla_resp:
user['cla_done'] = cla.CLA_URI_FEDORA_DONE in cla_resp.clas
if teams_resp:
# The groups do not contain the cla_ groups
user['groups'] = frozenset(teams_resp.teams)
# In the new AAA system, signing an agreement adds the user to a
# specific group. The FPCA has replaced the CLA.
if "signed_fpca" in user["groups"]:
user["cla_done"] = True
if ax_resp:
ssh_keys = ax_resp.get(
'http://fedoauth.org/openid/schema/SSH/key')
if isinstance(ssh_keys, (list, tuple)):
ssh_keys = '\n'.join(
ssh_key
for ssh_key in ssh_keys
if ssh_key.strip()
)
if ssh_keys:
user['ssh_key'] = ssh_keys
user['gpg_keyid'] = ax_resp.get(
'http://fedoauth.org/openid/schema/GPG/keyid')
flask.session['FLASK_FAS_OPENID_USER'] = user
flask.session.modified = True
if self.postlogin_func is not None:
self._check_session()
return self.postlogin_func(return_url)
else:
return flask.redirect(return_url)
else:
return 'Strange state: %s' % info.status
def _check_session(self):
if 'FLASK_FAS_OPENID_USER' not in flask.session \
or flask.session['FLASK_FAS_OPENID_USER'] is None:
flask.g.fas_user = None
else:
user = flask.session['FLASK_FAS_OPENID_USER']
# Add approved_memberships to provide backwards compatibility
# New applications should only use g.fas_user.groups
user['approved_memberships'] = []
for group in user['groups']:
membership = dict()
membership['name'] = group
user['approved_memberships'].append(Munch.fromDict(membership))
flask.g.fas_user = Munch.fromDict(user)
flask.g.fas_user.groups = frozenset(flask.g.fas_user.groups)
flask.g.fas_session_id = 0
def _check_safe_root(self, url):
if url is None:
return None
if url.startswith(flask.request.url_root) or url.startswith('/'):
# A URL inside the same app is deemed to always be safe
return url
return None
def login(self, username=None, password=None, return_url=None,
cancel_url=None, groups=['_FAS_ALL_GROUPS_']):
"""Tries to log in a user.
Sets the user information on :attr:`flask.g.fas_user`.
Will set 0 to :attr:`flask.g.fas_session_id, for compatibility
with flask_fas.
:kwarg username: Not used, but accepted for compatibility with the
flask_fas module
:kwarg password: Not used, but accepted for compatibility with the
flask_fas module
:kwarg return_url: The URL to forward the user to after login
:kwarg groups: A string or a list of group the user should belong
to to be authentified.
:returns: True if the user was succesfully authenticated.
:raises: Might raise an redirect to the OpenID endpoint
"""
if return_url is None:
if 'next' in flask.request.args.values():
return_url = flask.request.args.values['next']
else:
return_url = flask.request.url_root
# This makes sure that we only allow stuff where
# ?next= value is in a safe root (the application
# root)
return_url = (self._check_safe_root(return_url) or
flask.request.url_root)
session = {}
oidconsumer = consumer.Consumer(session, None)
try:
request = oidconsumer.begin(self.app.config['FAS_OPENID_ENDPOINT'])
except consumer.DiscoveryFailure as exc:
# VERY strange, as this means it could not discover an OpenID
# endpoint at FAS_OPENID_ENDPOINT
log.warn(exc)
return 'discoveryfailure'
if request is None:
# Also very strange, as this means the discovered OpenID
# endpoint is no OpenID endpoint
return 'no-request'
if isinstance(groups, six.string_types):
groups = [groups]
# Some applications pass the group list as a set. Convert to a list in
# case we need to append to it (see below).
if isinstance(groups, set):
groups = list(groups)
# In the new AAA system, we know a user has signed the FPCA by looking
# a group membership. We must therefore always request the
# corresponding group.
if "_FAS_ALL_GROUPS_" not in groups:
groups.append("signed_fpca")
request.addExtension(sreg.SRegRequest(
required=['nickname', 'fullname', 'email', 'timezone']))
request.addExtension(pape.Request([]))
request.addExtension(teams.TeamsRequest(requested=groups))
request.addExtension(cla.CLARequest(
requested=[cla.CLA_URI_FEDORA_DONE]))
ax_req = ax.FetchRequest()
ax_req.add(ax.AttrInfo(
type_uri='http://fedoauth.org/openid/schema/GPG/keyid'))
ax_req.add(ax.AttrInfo(
type_uri='http://fedoauth.org/openid/schema/SSH/key',
count='unlimited'))
request.addExtension(ax_req)
trust_root = self.normalize_url(flask.request.url_root)
return_to = trust_root + '_flask_fas_openid_handler/'
flask.session['FLASK_FAS_OPENID_RETURN_URL'] = return_url
flask.session['FLASK_FAS_OPENID_CANCEL_URL'] = cancel_url
if request_wants_json():
output = request.getMessage(trust_root,
return_to=return_to).toPostArgs()
output['server_url'] = request.endpoint.server_url
return flask.jsonify(output)
elif request.shouldSendRedirect():
redirect_url = request.redirectURL(trust_root, return_to, False)
return flask.redirect(redirect_url)
else:
return request.htmlMarkup(
trust_root, return_to,
form_tag_attrs={'id': 'openid_message'}, immediate=False)
def logout(self):
'''Logout the user associated with this session
'''
flask.session['FLASK_FAS_OPENID_USER'] = None
flask.g.fas_session_id = None
flask.g.fas_user = None
flask.session.modified = True
def normalize_url(self, url):
''' Replace the scheme prefix of a url with our preferred scheme.
'''
scheme = self.app.config['PREFERRED_URL_SCHEME']
scheme_index = url.index('://')
return scheme + url[scheme_index:]
# This is a decorator we can use with any HTTP method (except login, obviously)
# to require a login.
# If the user is not logged in, it will redirect them to the login form.
# http://flask.pocoo.org/docs/patterns/viewdecorators/#login-required-decorator
def fas_login_required(function):
""" Flask decorator to ensure that the user is logged in against FAS.
To use this decorator you need to have a function named 'auth_login'.
Without that function the redirect if the user is not logged in will not
work.
"""
@wraps(function)
def decorated_function(*args, **kwargs):
if flask.g.fas_user is None:
return flask.redirect(flask.url_for('auth_login',
next=flask.request.url))
return function(*args, **kwargs)
return decorated_function
def cla_plus_one_required(function):
""" Flask decorator to retrict access to CLA+1.
To use this decorator you need to have a function named 'auth_login'.
Without that function the redirect if the user is not logged in will not
work.
"""
@wraps(function)
def decorated_function(*args, **kwargs):
if flask.g.fas_user is None or not flask.g.fas_user.cla_done \
or len(flask.g.fas_user.groups) < 1:
# FAS-OpenID does not return cla_ groups
return flask.redirect(flask.url_for('auth_login',
next=flask.request.url))
else:
return function(*args, **kwargs)
return decorated_function