Skip to content

Commit

Permalink
Add unit test for Coriolis api midddleware auth
Browse files Browse the repository at this point in the history
  • Loading branch information
Cristi1324 committed Nov 7, 2023
1 parent 64d86bf commit 467fc2f
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 8 deletions.
30 changes: 22 additions & 8 deletions coriolis/api/middleware/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,36 @@


class CoriolisKeystoneContext(wsgi.Middleware):
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
def _get_project_id(self, req):
if 'X_TENANT_ID' in req.headers:
# This is the new header since Keystone went to ID/Name
return req.headers['X_TENANT_ID']
elif 'X_TENANT' in req.headers:
# This is for legacy compatibility
return req.headers['X_TENANT']
else:
raise webob.exc.HTTPBadRequest(
explanation=_("No 'X_TENANT_ID' or 'X_TENANT' passed."))

def _get_user(self, req):
user = req.headers.get('X_USER')
user = req.headers.get('X_USER_ID', user)
if user is None:
LOG.debug("Neither X_USER_ID nor X_USER found in request")
return webob.exc.HTTPUnauthorized()
return user

@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
user = self._get_user(req)

if isinstance(user, webob.exc.HTTPUnauthorized):
return user

project_id = self._get_project_id(req)

# get the roles
roles = [r.strip() for r in req.headers.get('X_ROLE', '').split(',')]
if 'X_TENANT_ID' in req.headers:
# This is the new header since Keystone went to ID/Name
project_id = req.headers['X_TENANT_ID']
else:
# This is for legacy compatibility
project_id = req.headers['X_TENANT']

project_name = req.headers.get('X_TENANT_NAME')
project_domain_name = req.headers.get('X-Project-Domain-Name')
Expand Down
Empty file added coriolis/tests/api/__init__.py
Empty file.
Empty file.
164 changes: 164 additions & 0 deletions coriolis/tests/api/middleware/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Copyright 2022 Cloudbase Solutions Srl
# All Rights Reserved.

from unittest import mock

from oslo_middleware import request_id
import webob

from coriolis.api.middleware import auth
from coriolis.api.middleware.auth import CoriolisKeystoneContext
from coriolis.api import wsgi
from coriolis import context
from coriolis.tests import test_base


class CoriolisTestException(Exception):
pass


class CoriolisKeystoneContextTestCase(test_base.CoriolisBaseTestCase):
"""Test suite for the Coriolis api middleware auth."""

def setUp(self):
super(CoriolisKeystoneContextTestCase, self).setUp()
self.auth = auth.CoriolisKeystoneContext(wsgi.Middleware)

@mock.patch.object(context, "RequestContext")
@mock.patch.object(CoriolisKeystoneContext, "_get_project_id")
@mock.patch.object(CoriolisKeystoneContext, "_get_user")
def test__call__(
self,
mock_get_user,
mock__get_project_id,
mock_request_context
):
req_mock = mock.Mock()

expected_roles = ['1', '2', '3']
expected_project_name = 'mock_project_name'
expected_project_domain_name = 'mock_project_domain_name'
expected_user_domain_name = 'mock_user_domain_name'
expected_auth_token = 'mock_token123'
expected_remote_address = 'mock_addr'
expected_service_catalog = {"catalog1": ["service1", "service2"]}
expected_req_id = 'mock_req_id'

req_mock.remote_addr = expected_remote_address
req_mock.environ = {
request_id.ENV_REQUEST_ID: expected_req_id.encode()
}

req_mock.headers = {
'X_ROLE': '1,2,3',
'X_TENANT_NAME': expected_project_name,
'X-Project-Domain-Name': expected_project_domain_name,
'X-User-Domain-Name': expected_user_domain_name,
'X_AUTH_TOKEN': expected_auth_token,
'X_SERVICE_CATALOG':
str(expected_service_catalog).replace("'", '"'),
}

result = self.auth(req_mock)

self.assertEqual(
self.auth.application,
result
)

mock_get_user.assert_called_once_with(req_mock)
mock__get_project_id.assert_called_once_with(req_mock)
mock_request_context.assert_called_once_with(
mock_get_user.return_value,
mock__get_project_id.return_value,
project_name=expected_project_name,
project_domain_name=expected_project_domain_name,
user_domain_name=expected_user_domain_name,
roles=expected_roles,
auth_token=expected_auth_token,
remote_address=expected_remote_address,
service_catalog=expected_service_catalog,
request_id=expected_req_id
)

self.assertEqual(
req_mock.environ['coriolis.context'],
mock_request_context.return_value
)

@mock.patch.object(CoriolisKeystoneContext, "_get_project_id")
@mock.patch.object(CoriolisKeystoneContext, "_get_user")
def test__call__invalid_service_catalog(
self,
mock_get_user,
mock__get_project_id,
):
req_mock = mock.Mock()

invalid_service_catalog = "mock_invalid_service_catalog"

req_mock.headers = {
'X_SERVICE_CATALOG': invalid_service_catalog,
}

self.assertRaises(
webob.exc.HTTPInternalServerError,
self.auth,
req_mock,
)

@mock.patch.object(context, "RequestContext")
def test__call__no_headers(self, mock_request_context):
req_mock = mock.Mock()

req_mock.headers = {}

result = self.auth(req_mock)

self.assertEqual(
webob.exc.HTTPUnauthorized().status,
result.status
)

def test_get_project_id_tenant_id(self):
req_mock = mock.Mock()

expected_result = 'mock_tenant'

req_mock.headers = {
'X_TENANT_ID': expected_result,
}

result = self.auth._get_project_id(req_mock)

self.assertEqual(
expected_result,
result
)

def test_get_project_id_tenant(self):
req_mock = mock.Mock()

expected_result = 'mock_tenant'

req_mock.headers = {
'X_TENANT': expected_result,
}

result = self.auth._get_project_id(req_mock)

self.assertEqual(
expected_result,
result
)

def test_get_project_id_no_tenant(self):
req_mock = mock.Mock()

req_mock.headers = {}

self.assertRaises(
webob.exc.HTTPBadRequest,
self.auth._get_project_id,
req_mock
)
Empty file.
Empty file.

0 comments on commit 467fc2f

Please sign in to comment.