diff --git a/onadata/apps/api/tests/viewsets/test_xform_list_viewset.py b/onadata/apps/api/tests/viewsets/test_xform_list_viewset.py
index 723aaf12ef..e21b183469 100644
--- a/onadata/apps/api/tests/viewsets/test_xform_list_viewset.py
+++ b/onadata/apps/api/tests/viewsets/test_xform_list_viewset.py
@@ -9,12 +9,12 @@
from django_digest.test import DigestAuth
from mock import patch
-from onadata.apps.api.tests.viewsets.test_abstract_viewset import (
- TestAbstractViewSet
-)
+from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet
from onadata.apps.api.viewsets.project_viewset import ProjectViewSet
from onadata.apps.api.viewsets.xform_list_viewset import (
- PreviewXFormListViewSet, XFormListViewSet)
+ PreviewXFormListViewSet,
+ XFormListViewSet,
+)
from onadata.apps.main.models import MetaData
from onadata.libs.permissions import DataEntryRole, ReadOnlyRole, OwnerRole
@@ -26,78 +26,73 @@ def setUp(self):
self._publish_xls_form_to_project()
def test_get_xform_list(self):
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request)
self.assertEqual(response.status_code, 200)
- path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList.xml')
- with open(path, encoding='utf-8') as f:
+ path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "formList.xml")
+ with open(path, encoding="utf-8") as f:
form_list_xml = f.read().strip()
data = {"hash": self.xform.hash, "pk": self.xform.pk}
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list_xml % data)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'],
- 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_get_xform_list_xform_pk_filter_anon(self):
"""
Test formList xform_pk filter for anonymous user.
"""
- request = self.factory.get('/')
- response = self.view(request, username=self.user.username,
- xform_pk=self.xform.pk + 10000)
+ request = self.factory.get("/")
+ response = self.view(
+ request, username=self.user.username, xform_pk=self.xform.pk + 10000
+ )
self.assertEqual(response.status_code, 404)
# existing form is in result when xform_pk filter is in use.
- response = self.view(request, username=self.user.username,
- xform_pk=self.xform.pk)
+ response = self.view(
+ request, username=self.user.username, xform_pk=self.xform.pk
+ )
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList.xml')
+ path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "formList.xml")
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list_xml = f.read().strip()
data = {"hash": self.xform.hash, "pk": self.xform.pk}
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list_xml % data)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'],
- 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_get_xform_list_form_id_filter(self):
"""
Test formList formID filter
"""
# Test unrecognized formID
- request = self.factory.get('/', {'formID': 'unrecognizedID'})
+ request = self.factory.get("/", {"formID": "unrecognizedID"})
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data, [])
# Test a valid formID
- request = self.factory.get('/', {'formID': self.xform.id_string})
+ request = self.factory.get("/", {"formID": self.xform.id_string})
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 200)
- path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList.xml')
+ path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "formList.xml")
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list_xml = f.read().strip()
data = {"hash": self.xform.hash, "pk": self.xform.pk}
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list_xml % data)
def test_form_id_filter_for_non_require_auth_account(self):
@@ -105,13 +100,18 @@ def test_form_id_filter_for_non_require_auth_account(self):
Test formList formID filter for account that requires authentication
"""
# Bob submit forms
- xls_path = os.path.join(settings.PROJECT_ROOT, "apps", "main", "tests",
- "fixtures", "tutorial.xlsx")
+ xls_path = os.path.join(
+ settings.PROJECT_ROOT, "apps", "main", "tests", "fixtures", "tutorial.xlsx"
+ )
self._publish_xls_form_to_project(xlsform_path=xls_path)
- xls_file_path = os.path.join(settings.PROJECT_ROOT, "apps", "logger",
- "fixtures",
- "external_choice_form_v1.xlsx")
+ xls_file_path = os.path.join(
+ settings.PROJECT_ROOT,
+ "apps",
+ "logger",
+ "fixtures",
+ "external_choice_form_v1.xlsx",
+ )
self._publish_xls_form_to_project(xlsform_path=xls_file_path)
# Set require auth to true for this user
@@ -122,10 +122,12 @@ def test_form_id_filter_for_non_require_auth_account(self):
self.xform.shared = False
self.xform.save()
request = self.factory.get(
- f'/{self.user.username}/{self.xform.pk}/formList',
- {'formID': self.xform.id_string})
+ f"/{self.user.username}/{self.xform.pk}/formList",
+ {"formID": self.xform.id_string},
+ )
response = self.view(
- request, username=self.user.username, xform_pk=self.xform.pk)
+ request, username=self.user.username, xform_pk=self.xform.pk
+ )
self.assertEqual(response.status_code, 401)
self.user.profile.require_auth = False
@@ -136,29 +138,33 @@ def test_form_id_filter_for_non_require_auth_account(self):
self.xform.save()
alice_data = {
- 'username': 'alice',
- 'email': 'alice@localhost.com',
- 'password1': 'alice',
- 'password2': 'alice'
+ "username": "alice",
+ "email": "alice@localhost.com",
+ "password1": "alice",
+ "password2": "alice",
}
self._create_user_profile(alice_data)
- auth = DigestAuth('alice', 'alice')
+ auth = DigestAuth("alice", "alice")
request = self.factory.get(
- f'/{self.user.username}/{self.xform.pk}/formList',
- {'formID': self.xform.id_string})
+ f"/{self.user.username}/{self.xform.pk}/formList",
+ {"formID": self.xform.id_string},
+ )
request.META.update(auth(request.META, response))
response = self.view(
- request, username=self.user.username, xform_pk=self.xform.pk)
+ request, username=self.user.username, xform_pk=self.xform.pk
+ )
self.assertEqual(response.status_code, 200)
# ensure anonymous users still have access
# to the xform with id self.xform.pk
request = self.factory.get(
- f'/{self.user.username}/{self.xform.pk}/formList',
- {'formID': self.xform.id_string})
+ f"/{self.user.username}/{self.xform.pk}/formList",
+ {"formID": self.xform.id_string},
+ )
response = self.view(
- request, username=self.user.username, xform_pk=self.xform.pk)
+ request, username=self.user.username, xform_pk=self.xform.pk
+ )
self.assertEqual(response.status_code, 200)
def test_form_id_filter_for_require_auth_account(self):
@@ -166,101 +172,109 @@ def test_form_id_filter_for_require_auth_account(self):
Test formList formID filter for account that requires authentication
"""
# Bob submit forms
- xls_path = os.path.join(settings.PROJECT_ROOT, "apps", "main", "tests",
- "fixtures", "tutorial.xlsx")
+ xls_path = os.path.join(
+ settings.PROJECT_ROOT, "apps", "main", "tests", "fixtures", "tutorial.xlsx"
+ )
self._publish_xls_form_to_project(xlsform_path=xls_path)
- xls_file_path = os.path.join(settings.PROJECT_ROOT, "apps", "logger",
- "fixtures",
- "external_choice_form_v1.xlsx")
+ xls_file_path = os.path.join(
+ settings.PROJECT_ROOT,
+ "apps",
+ "logger",
+ "fixtures",
+ "external_choice_form_v1.xlsx",
+ )
self._publish_xls_form_to_project(xlsform_path=xls_file_path)
# Set require auth to true
self.user.profile.require_auth = True
self.user.profile.save()
- request = self.factory.get('/', {'formID': self.xform.id_string})
+ request = self.factory.get("/", {"formID": self.xform.id_string})
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 401)
# Test for authenticated user but unrecognized formID
- auth = DigestAuth('bob', 'bobbob')
- request = self.factory.get('/', {'formID': 'unrecognizedID'})
+ auth = DigestAuth("bob", "bobbob")
+ request = self.factory.get("/", {"formID": "unrecognizedID"})
request.META.update(auth(request.META, response))
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data, [])
# Test for authenticated user and valid formID
- request = self.factory.get('/', {'formID': self.xform.id_string})
+ request = self.factory.get("/", {"formID": self.xform.id_string})
self.assertTrue(self.user.profile.require_auth)
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 200)
path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList2.xml')
+ os.path.dirname(__file__), "..", "fixtures", "formList2.xml"
+ )
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list = f.read().strip()
- data = {"hash": self.xform.hash, "pk": self.xform.pk,
- 'version': self.xform.version}
- content = response.render().content.decode('utf-8')
+ data = {
+ "hash": self.xform.hash,
+ "pk": self.xform.pk,
+ "version": self.xform.version,
+ }
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list % data)
# Test for shared forms
# Create user Alice
alice_data = {
- 'username': 'alice',
- 'email': 'alice@localhost.com',
- 'password1': 'alice',
- 'password2': 'alice'
+ "username": "alice",
+ "email": "alice@localhost.com",
+ "password1": "alice",
+ "password2": "alice",
}
alice_profile = self._create_user_profile(alice_data)
# check that she can authenticate successfully
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('alice', 'alice')
+ auth = DigestAuth("alice", "alice")
request.META.update(auth(request.META, response))
response = self.view(request)
self.assertEqual(response.status_code, 200)
- self.assertFalse(
- ReadOnlyRole.user_has_role(alice_profile.user, self.project))
+ self.assertFalse(ReadOnlyRole.user_has_role(alice_profile.user, self.project))
# share Bob's project with Alice
- data = {
- 'username': 'alice',
- 'role': ReadOnlyRole.name
- }
- request = self.factory.post('/', data=data, **self.extra)
- share_view = ProjectViewSet.as_view({'post': 'share'})
+ data = {"username": "alice", "role": ReadOnlyRole.name}
+ request = self.factory.post("/", data=data, **self.extra)
+ share_view = ProjectViewSet.as_view({"post": "share"})
project_id = self.project.pk
response = share_view(request, pk=project_id)
self.assertEqual(response.status_code, 204)
- self.assertTrue(
- ReadOnlyRole.user_has_role(alice_profile.user, self.project))
+ self.assertTrue(ReadOnlyRole.user_has_role(alice_profile.user, self.project))
- request = self.factory.get('/', {'formID': self.xform.id_string})
+ request = self.factory.get("/", {"formID": self.xform.id_string})
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('alice', 'alice')
+ auth = DigestAuth("alice", "alice")
request.META.update(auth(request.META, response))
- response = self.view(request, username='alice')
+ response = self.view(request, username="alice")
self.assertEqual(response.status_code, 200)
path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList2.xml')
+ os.path.dirname(__file__), "..", "fixtures", "formList2.xml"
+ )
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list = f.read().strip()
- data = {"hash": self.xform.hash, "pk": self.xform.pk,
- "version": self.xform.version}
- content = response.render().content.decode('utf-8')
+ data = {
+ "hash": self.xform.hash,
+ "pk": self.xform.pk,
+ "version": self.xform.version,
+ }
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list % data)
# Bob's profile
@@ -268,39 +282,44 @@ def test_form_id_filter_for_require_auth_account(self):
# Submit form as Alice
self._login_user_and_profile(extra_post_data=alice_data)
- self.assertEqual(self.user.username, 'alice')
+ self.assertEqual(self.user.username, "alice")
path = os.path.join(
- settings.PROJECT_ROOT, "apps", "main", "tests", "fixtures",
- "good_eats_multilang", "good_eats_multilang.xlsx")
+ settings.PROJECT_ROOT,
+ "apps",
+ "main",
+ "tests",
+ "fixtures",
+ "good_eats_multilang",
+ "good_eats_multilang.xlsx",
+ )
self._publish_xls_form_to_project(xlsform_path=path)
- self.assertTrue(OwnerRole.user_has_role(alice_profile.user,
- self.xform))
+ self.assertTrue(OwnerRole.user_has_role(alice_profile.user, self.xform))
# Share Alice's form with Bob
ReadOnlyRole.add(bob_profile, self.xform)
self.assertTrue(ReadOnlyRole.user_has_role(bob_profile, self.xform))
# Get unrecognized formID as bob
- request = self.factory.get('/', {'formID': 'unrecognizedID'})
+ request = self.factory.get("/", {"formID": "unrecognizedID"})
response = self.view(request, username=bob_profile.username)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request, username=bob_profile.username)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data, [])
# Get Alice's form as Bob
- request = self.factory.get('/', {'formID': 'good_eats_multilang'})
+ request = self.factory.get("/", {"formID": "good_eats_multilang"})
response = self.view(request, username=bob_profile.username)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request, username=bob_profile.username)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(response.data[0]['formID'], 'good_eats_multilang')
+ self.assertEqual(response.data[0]["formID"], "good_eats_multilang")
def test_get_xform_list_project_pk_filter(self):
"""
@@ -308,36 +327,33 @@ def test_get_xform_list_project_pk_filter(self):
"""
self.user.profile.require_auth = True
self.user.profile.save()
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request, project_pk=self.project.pk)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request, project_pk=self.project.pk)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
# existing form is in result when xform_pk filter is in use.
response = self.view(request, project_pk=self.project.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList.xml')
+ path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "formList.xml")
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list_xml = f.read().strip()
data = {"hash": self.xform.hash, "pk": self.xform.pk}
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list_xml % data)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'],
- 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_get_xform_list_xform_pk_filter(self):
"""
@@ -345,52 +361,58 @@ def test_get_xform_list_xform_pk_filter(self):
"""
self.user.profile.require_auth = True
self.user.profile.save()
- request = self.factory.get('/')
- response = self.view(request, username=self.user.username,
- xform_pk=self.xform.pk)
+ request = self.factory.get("/")
+ response = self.view(
+ request, username=self.user.username, xform_pk=self.xform.pk
+ )
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
- response = self.view(request, username=self.user.username,
- xform_pk=self.xform.pk + 10000)
+ response = self.view(
+ request, username=self.user.username, xform_pk=self.xform.pk + 10000
+ )
self.assertEqual(response.status_code, 404)
- request = self.factory.get('/')
- response = self.view(request, username=self.user.username,
- xform_pk=self.xform.pk)
+ request = self.factory.get("/")
+ response = self.view(
+ request, username=self.user.username, xform_pk=self.xform.pk
+ )
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
# existing form is in result when xform_pk filter is in use.
- response = self.view(request, username=self.user.username,
- xform_pk=self.xform.pk)
+ response = self.view(
+ request, username=self.user.username, xform_pk=self.xform.pk
+ )
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList.xml')
+ path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "formList.xml")
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list_xml = f.read().strip()
data = {"hash": self.xform.hash, "pk": self.xform.pk}
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list_xml % data)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'],
- 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_get_xform_list_of_logged_in_user_with_username_param(self):
# publish 2 forms as bob
- xls_path = os.path.join(settings.PROJECT_ROOT, "apps", "main", "tests",
- "fixtures", "tutorial.xlsx")
+ xls_path = os.path.join(
+ settings.PROJECT_ROOT, "apps", "main", "tests", "fixtures", "tutorial.xlsx"
+ )
self._publish_xls_form_to_project(xlsform_path=xls_path)
- xls_file_path = os.path.join(settings.PROJECT_ROOT, "apps", "logger",
- "fixtures",
- "external_choice_form_v1.xlsx")
+ xls_file_path = os.path.join(
+ settings.PROJECT_ROOT,
+ "apps",
+ "logger",
+ "fixtures",
+ "external_choice_form_v1.xlsx",
+ )
self._publish_xls_form_to_project(xlsform_path=xls_file_path)
# change one of bob's forms to public
@@ -402,82 +424,81 @@ def test_get_xform_list_of_logged_in_user_with_username_param(self):
# check that bob still has 2 private forms
self.assertEqual(self.user.xforms.filter(shared=False).count(), 2)
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
- response = self.view(request, username='bob')
+ response = self.view(request, username="bob")
# check that bob's request is succesful and it returns both public and
# private forms that belong to bob
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 3)
alice_data = {
- 'username': 'alice',
- 'email': 'alice@localhost.com',
+ "username": "alice",
+ "email": "alice@localhost.com",
}
self._login_user_and_profile(extra_post_data=alice_data)
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('alice', 'bobbob')
+ auth = DigestAuth("alice", "bobbob")
request.META.update(auth(request.META, response))
- response = self.view(request, username='bob')
+ response = self.view(request, username="bob")
# check that alice's request is succesful and it returns public forms
# owned by bob
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(response.data[0].get('formID'), xform_id_string)
+ self.assertEqual(response.data[0].get("formID"), xform_id_string)
def test_get_xform_list_with_malformed_cookie(self):
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- request.COOKIES['__enketo'] = 'hello'
+ request.COOKIES["__enketo"] = "hello"
response = self.view(request)
self.assertEqual(response.status_code, 401)
self.assertEqual(
- response.data.get('detail'),
- u'JWT DecodeError: Not enough segments')
+ response.data.get("detail"), "JWT DecodeError: Not enough segments"
+ )
- @patch('onadata.apps.api.viewsets.project_viewset.send_mail')
+ @patch("onadata.apps.api.viewsets.project_viewset.send_mail")
def test_read_only_users_get_non_empty_formlist_using_preview_formlist(
- self, mock_send_mail):
+ self, mock_send_mail
+ ):
alice_data = {
- 'username': 'alice',
- 'email': 'alice@localhost.com',
- 'password1': 'alice',
- 'password2': 'alice'
+ "username": "alice",
+ "email": "alice@localhost.com",
+ "password1": "alice",
+ "password2": "alice",
}
alice_profile = self._create_user_profile(alice_data)
- self.assertFalse(
- ReadOnlyRole.user_has_role(alice_profile.user, self.project))
+ self.assertFalse(ReadOnlyRole.user_has_role(alice_profile.user, self.project))
# share bob's project with alice
data = {
- 'username': 'alice',
- 'role': ReadOnlyRole.name,
- 'email_msg': 'I have shared the project with you'
+ "username": "alice",
+ "role": ReadOnlyRole.name,
+ "email_msg": "I have shared the project with you",
}
- request = self.factory.post('/', data=data, **self.extra)
- share_view = ProjectViewSet.as_view({'post': 'share'})
+ request = self.factory.post("/", data=data, **self.extra)
+ share_view = ProjectViewSet.as_view({"post": "share"})
projectid = self.project.pk
response = share_view(request, pk=projectid)
self.assertEqual(response.status_code, 204)
self.assertTrue(mock_send_mail.called)
- self.assertTrue(
- ReadOnlyRole.user_has_role(alice_profile.user, self.project))
+ self.assertTrue(ReadOnlyRole.user_has_role(alice_profile.user, self.project))
# check that she can authenticate successfully
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('alice', 'alice')
+ auth = DigestAuth("alice", "alice")
request.META.update(auth(request.META, response))
- response = self.view(request, username='bob')
+ response = self.view(request, username="bob")
self.assertEqual(response.status_code, 200)
# check that alice gets an empty response when requesting bob's
# formlist
@@ -486,250 +507,225 @@ def test_read_only_users_get_non_empty_formlist_using_preview_formlist(
# set endpoint to preview formList
self.view = PreviewXFormListViewSet.as_view({"get": "list"})
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
self.assertNotEqual(response.data, [])
- auth = DigestAuth('alice', 'alice')
+ auth = DigestAuth("alice", "alice")
request.META.update(auth(request.META, response))
- response = self.view(request, username='bob')
+ response = self.view(request, username="bob")
self.assertEqual(response.status_code, 200)
# check that alice does NOT get an empty response when requesting bob's
# formlist when using the preview formlist endpoint
self.assertNotEqual(response.data, [])
- @patch('onadata.apps.api.viewsets.project_viewset.send_mail')
+ @patch("onadata.apps.api.viewsets.project_viewset.send_mail")
def test_get_xform_list_with_shared_forms(self, mock_send_mail):
# create user alice
alice_data = {
- 'username': 'alice',
- 'email': 'alice@localhost.com',
- 'password1': 'alice',
- 'password2': 'alice'
+ "username": "alice",
+ "email": "alice@localhost.com",
+ "password1": "alice",
+ "password2": "alice",
}
alice_profile = self._create_user_profile(alice_data)
# check that she can authenticate successfully
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('alice', 'alice')
+ auth = DigestAuth("alice", "alice")
request.META.update(auth(request.META, response))
response = self.view(request)
self.assertEqual(response.status_code, 200)
- self.assertFalse(
- ReadOnlyRole.user_has_role(alice_profile.user, self.project))
+ self.assertFalse(ReadOnlyRole.user_has_role(alice_profile.user, self.project))
# share bob's project with her
data = {
- 'username': 'alice',
- 'role': ReadOnlyRole.name,
- 'email_msg': 'I have shared the project with you'
+ "username": "alice",
+ "role": ReadOnlyRole.name,
+ "email_msg": "I have shared the project with you",
}
- request = self.factory.post('/', data=data, **self.extra)
- share_view = ProjectViewSet.as_view({'post': 'share'})
+ request = self.factory.post("/", data=data, **self.extra)
+ share_view = ProjectViewSet.as_view({"post": "share"})
projectid = self.project.pk
response = share_view(request, pk=projectid)
self.assertEqual(response.status_code, 204)
self.assertTrue(mock_send_mail.called)
- self.assertTrue(
- ReadOnlyRole.user_has_role(alice_profile.user, self.project))
+ self.assertTrue(ReadOnlyRole.user_has_role(alice_profile.user, self.project))
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('alice', 'alice')
+ auth = DigestAuth("alice", "alice")
request.META.update(auth(request.META, response))
- response = self.view(request, username='alice')
+ response = self.view(request, username="alice")
self.assertEqual(response.status_code, 200)
- path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList.xml')
+ path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "formList.xml")
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list_xml = f.read().strip()
data = {"hash": self.xform.hash, "pk": self.xform.pk}
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list_xml % data)
- download_url = ('http://testserver/%s/'
- 'forms/%s/form.xml') % (
- self.user.username, self.xform.id)
+ download_url = (
+ "http://testserver/%s/" "forms/%s/form.xml"
+ ) % (self.user.username, self.xform.id)
# check that bob's form exists in alice's formList
self.assertTrue(download_url in content)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'],
- 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_get_xform_list_inactive_form(self):
self.xform.downloadable = False
self.xform.save()
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request)
self.assertEqual(response.status_code, 200)
- xml = u'\n'
- content = response.render().content.decode('utf-8')
+ xml = '\n'
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, xml)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'], 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_get_xform_list_anonymous_user(self):
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 200)
- path = os.path.join(
- os.path.dirname(__file__), '..', 'fixtures', 'formList.xml')
+ path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "formList.xml")
- with open(path, encoding='utf-8') as f:
+ with open(path, encoding="utf-8") as f:
form_list_xml = f.read().strip()
data = {"hash": self.xform.hash, "pk": self.xform.pk}
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertEqual(content, form_list_xml % data)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'],
- 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_get_xform_list_anonymous_user_require_auth(self):
self.user.profile.require_auth = True
self.user.profile.save()
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
self.assertEqual(response.status_code, 401)
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 401)
def test_get_xform_list_other_user_with_no_role(self):
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request)
- alice_data = {'username': 'alice', 'email': 'alice@localhost.com'}
+ alice_data = {"username": "alice", "email": "alice@localhost.com"}
alice_profile = self._create_user_profile(alice_data)
- self.assertFalse(
- ReadOnlyRole.user_has_role(alice_profile.user, self.xform))
+ self.assertFalse(ReadOnlyRole.user_has_role(alice_profile.user, self.xform))
- auth = DigestAuth('alice', 'bobbob')
+ auth = DigestAuth("alice", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request)
self.assertEqual(response.status_code, 200)
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
self.assertNotIn(self.xform.id_string, content)
- self.assertIn('\n\n\n\n
-screenshot.png%(hash)shttp://testserver/bob/xformsMedia/%(xform)s/%(pk)s.png""" # noqa
+ manifest_xml = """screenshot.png%(hash)shttp://testserver/bob/xformsMedia/%(xform)s/%(pk)s.png""" # noqa
data = {
"hash": self.metadata.hash,
"pk": self.metadata.pk,
- "xform": self.xform.pk
+ "xform": self.xform.pk,
}
- content = response.render().content.decode('utf-8').strip()
+ content = "".join(
+ [i.decode("utf-8").strip() for i in response.streaming_content]
+ )
self.assertEqual(content, manifest_xml % data)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'], 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_retrieve_xform_manifest_anonymous_user(self):
self._load_metadata(self.xform)
self.view = XFormListViewSet.as_view({"get": "manifest"})
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request, pk=self.xform.pk)
self.assertEqual(response.status_code, 401)
- response = self.view(
- request, pk=self.xform.pk, username=self.user.username)
+ response = self.view(request, pk=self.xform.pk, username=self.user.username)
self.assertEqual(response.status_code, 200)
- manifest_xml = """
-screenshot.png%(hash)shttp://testserver/bob/xformsMedia/%(xform)s/%(pk)s.png""" # noqa
+ manifest_xml = """screenshot.png%(hash)shttp://testserver/bob/xformsMedia/%(xform)s/%(pk)s.png""" # noqa
data = {
"hash": self.metadata.hash,
"pk": self.metadata.pk,
- "xform": self.xform.pk
+ "xform": self.xform.pk,
}
- content = response.render().content.decode('utf-8').strip()
+ content = "".join(
+ [i.decode("utf-8").strip() for i in response.streaming_content]
+ )
self.assertEqual(content, manifest_xml % data)
- self.assertTrue(response.has_header('X-OpenRosa-Version'))
- self.assertTrue(
- response.has_header('X-OpenRosa-Accept-Content-Length'))
- self.assertTrue(response.has_header('Date'))
- self.assertEqual(response['Content-Type'], 'text/xml; charset=utf-8')
+ self.assertTrue(response.has_header("X-OpenRosa-Version"))
+ self.assertTrue(response.has_header("X-OpenRosa-Accept-Content-Length"))
+ self.assertTrue(response.has_header("Date"))
+ self.assertEqual(response["Content-Type"], "text/xml; charset=utf-8")
def test_retrieve_xform_manifest_anonymous_user_require_auth(self):
self.user.profile.require_auth = True
self.user.profile.save()
self._load_metadata(self.xform)
self.view = XFormListViewSet.as_view({"get": "manifest"})
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(request, pk=self.xform.pk)
self.assertEqual(response.status_code, 401)
- response = self.view(
- request, pk=self.xform.pk, username=self.user.username)
+ response = self.view(request, pk=self.xform.pk, username=self.user.username)
self.assertEqual(response.status_code, 401)
def test_retrieve_xform_media(self):
self._load_metadata(self.xform)
- self.view = XFormListViewSet.as_view(
- {
- "get": "media",
- "head": "media"
- }
- )
- request = self.factory.head('/')
+ self.view = XFormListViewSet.as_view({"get": "media", "head": "media"})
+ request = self.factory.head("/")
response = self.view(
- request, pk=self.xform.pk, metadata=self.metadata.pk, format='png')
- auth = DigestAuth('bob', 'bobbob')
- request = self.factory.get('/')
+ request, pk=self.xform.pk, metadata=self.metadata.pk, format="png"
+ )
+ auth = DigestAuth("bob", "bobbob")
+ request = self.factory.get("/")
request.META.update(auth(request.META, response))
response = self.view(
- request, pk=self.xform.pk, metadata=self.metadata.pk, format='png')
+ request, pk=self.xform.pk, metadata=self.metadata.pk, format="png"
+ )
self.assertEqual(response.status_code, 200)
def test_retrieve_xform_media_anonymous_user(self):
self._load_metadata(self.xform)
self.view = XFormListViewSet.as_view({"get": "media"})
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(
- request, pk=self.xform.pk, metadata=self.metadata.pk, format='png')
+ request, pk=self.xform.pk, metadata=self.metadata.pk, format="png"
+ )
self.assertEqual(response.status_code, 401)
response = self.view(
@@ -835,7 +822,8 @@ def test_retrieve_xform_media_anonymous_user(self):
pk=self.xform.pk,
username=self.user.username,
metadata=self.metadata.pk,
- format='png')
+ format="png",
+ )
self.assertEqual(response.status_code, 200)
def test_retrieve_xform_media_anonymous_user_require_auth(self):
@@ -843,100 +831,103 @@ def test_retrieve_xform_media_anonymous_user_require_auth(self):
self.user.profile.save()
self._load_metadata(self.xform)
self.view = XFormListViewSet.as_view({"get": "media"})
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = self.view(
- request, pk=self.xform.pk, metadata=self.metadata.pk, format='png')
+ request, pk=self.xform.pk, metadata=self.metadata.pk, format="png"
+ )
self.assertEqual(response.status_code, 401)
def test_retrieve_xform_media_linked_xform(self):
- data_type = 'media'
- data_value = 'xform {} transportation'.format(self.xform.pk)
+ data_type = "media"
+ data_value = "xform {} transportation".format(self.xform.pk)
self._add_form_metadata(self.xform, data_type, data_value)
self._make_submissions()
self.xform.refresh_from_db()
- self.view = XFormListViewSet.as_view(
- {
- "get": "manifest",
- "head": "manifest"
- }
- )
- request = self.factory.head('/')
+ self.view = XFormListViewSet.as_view({"get": "manifest", "head": "manifest"})
+ request = self.factory.head("/")
response = self.view(request, pk=self.xform.pk)
- auth = DigestAuth('bob', 'bobbob')
- request = self.factory.get('/')
+ auth = DigestAuth("bob", "bobbob")
+ request = self.factory.get("/")
request.META.update(auth(request.META, response))
response = self.view(request, pk=self.xform.pk)
self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data[0]['filename'], 'transportation.csv')
- self.assertEqual(
- response.data[0]['hash'], 'md5:%s' % md5(
- self.xform.last_submission_time.isoformat().encode(
- 'utf-8')).hexdigest())
-
- self.view = XFormListViewSet.as_view(
- {
- "get": "media",
- "head": "media"
- }
+
+ manifest_xml = """{}{}{}""" # noqa
+ expected_downloadUrl = f"http://testserver/bob/xformsMedia/{self.xform.pk}/{self.metadata.pk}.csv?group_delimiter=.&repeat_index_tags=_,_" # noqa
+ expected_hash = md5(
+ self.xform.last_submission_time.isoformat().encode("utf-8")
+ ).hexdigest()
+ expected_content = manifest_xml.format(
+ "transportation.csv", f"md5:{expected_hash}", expected_downloadUrl
)
- request = self.factory.get('/')
+ content = "".join(
+ [i.decode("utf-8").strip() for i in response.streaming_content]
+ )
+ self.assertEqual(content, expected_content)
+
+ self.view = XFormListViewSet.as_view({"get": "media", "head": "media"})
+ request = self.factory.get("/")
response = self.view(
- request, pk=self.xform.pk, metadata=self.metadata.pk, format='csv')
+ request, pk=self.xform.pk, metadata=self.metadata.pk, format="csv"
+ )
self.assertEqual(response.status_code, 401)
- request = self.factory.head('/')
+ request = self.factory.head("/")
response = self.view(
- request, pk=self.xform.pk, metadata=self.metadata.pk, format='csv')
- auth = DigestAuth('bob', 'bobbob')
- request = self.factory.get('/')
+ request, pk=self.xform.pk, metadata=self.metadata.pk, format="csv"
+ )
+ auth = DigestAuth("bob", "bobbob")
+ request = self.factory.get("/")
request.META.update(auth(request.META, response))
response = self.view(
- request, pk=self.xform.pk, metadata=self.metadata.pk, format='csv')
+ request, pk=self.xform.pk, metadata=self.metadata.pk, format="csv"
+ )
self.assertEqual(response.status_code, 200)
- self.assertEqual(response['Content-Disposition'],
- 'attachment; filename=transportation.csv')
+ self.assertEqual(
+ response["Content-Disposition"], "attachment; filename=transportation.csv"
+ )
def test_retrieve_xform_manifest_linked_form(self):
# for linked forms check if manifest media download url for csv
# has a group_delimiter param
- data_type = 'media'
- data_value = 'xform {} transportation'.format(self.xform.pk)
- media = self._add_form_metadata(self.xform, data_type, data_value)
-
- self.view = XFormListViewSet.as_view(
- {
- "get": "manifest",
- "head": "manifest"
- }
- )
+ data_type = "media"
+ data_value = "xform {} transportation".format(self.xform.pk)
+ self._add_form_metadata(self.xform, data_type, data_value)
+
+ self.view = XFormListViewSet.as_view({"get": "manifest", "head": "manifest"})
# sign in bob
- request = self.factory.head('/')
+ request = self.factory.head("/")
auth_response = self.view(request, pk=self.xform.pk)
- auth = DigestAuth('bob', 'bobbob')
+ auth = DigestAuth("bob", "bobbob")
# set up bob's request
- request = self.factory.get('/xformsManifest')
+ request = self.factory.get("/xformsManifest")
request.META.update(auth(request.META, auth_response))
# make request
- response = self.view(request, pk=self.xform.pk, format='csv')
+ response = self.view(request, pk=self.xform.pk)
# test
- manifest_media_url = '{}{}'.format(
- media.data['media_url'],
- '?group_delimiter=.&repeat_index_tags=_,_')
- download_url = response.data[0]['downloadUrl']
- self.assertEqual(manifest_media_url, download_url)
+ manifest_xml = """{}{}{}""" # noqa
+ expected_downloadUrl = f"http://testserver/bob/xformsMedia/{self.xform.pk}/{self.metadata.pk}.csv?group_delimiter=.&repeat_index_tags=_,_" # noqa
+ expected_content = manifest_xml.format(
+ "transportation.csv", "md5:", expected_downloadUrl
+ )
+ content = "".join(
+ [i.decode("utf-8").strip() for i in response.streaming_content]
+ )
+ self.assertEqual(content, expected_content)
- url = '/bob/xformsMedia/{}/{}.csv?group_delimiter=.'\
- .format(self.xform.pk, self.metadata.pk)
- username = 'bob'
- password = 'bob'
+ url = "/bob/xformsMedia/{}/{}.csv?group_delimiter=.".format(
+ self.xform.pk, self.metadata.pk
+ )
+ username = "bob"
+ password = "bob"
client = DigestClient()
- client.set_authorization(username, password, 'Digest')
+ client.set_authorization(username, password, "Digest")
req = client.get(url)
self.assertEqual(req.status_code, 200)
@@ -949,28 +940,18 @@ def test_retrieve_xform_manifest_linked_form(self):
self.assertEqual(req.status_code, 401)
def test_xform_3gp_media_type(self):
-
for fmt in ["png", "jpg", "mp3", "3gp", "wav"]:
url = reverse(
- 'xform-media',
- kwargs={
- 'username': 'bob',
- 'pk': 1,
- 'metadata': '1234',
- 'format': fmt
- })
+ "xform-media",
+ kwargs={"username": "bob", "pk": 1, "metadata": "1234", "format": fmt},
+ )
- self.assertEqual(url, '/bob/xformsMedia/1/1234.{}'.format(fmt))
+ self.assertEqual(url, "/bob/xformsMedia/1/1234.{}".format(fmt))
def test_get_xform_anonymous_user_xform_require_auth(self):
- self.view = XFormListViewSet.as_view(
- {
- "get": "retrieve",
- "head": "retrieve"
- }
- )
- request = self.factory.head('/')
- response = self.view(request, username='bob', pk=self.xform.pk)
+ self.view = XFormListViewSet.as_view({"get": "retrieve", "head": "retrieve"})
+ request = self.factory.head("/")
+ response = self.view(request, username="bob", pk=self.xform.pk)
# no authentication prompted
self.assertEqual(response.status_code, 200)
@@ -980,15 +961,15 @@ def test_get_xform_anonymous_user_xform_require_auth(self):
self.xform.require_auth = True
self.xform.save()
- request = self.factory.head('/')
- response = self.view(request, username='bob', pk=self.xform.pk)
+ request = self.factory.head("/")
+ response = self.view(request, username="bob", pk=self.xform.pk)
# authentication prompted
self.assertEqual(response.status_code, 401)
- auth = DigestAuth('bob', 'bobbob')
- request = self.factory.get('/')
+ auth = DigestAuth("bob", "bobbob")
+ request = self.factory.get("/")
request.META.update(auth(request.META, response))
- response = self.view(request, username='bob', pk=self.xform.pk)
+ response = self.view(request, username="bob", pk=self.xform.pk)
# success with authentication
self.assertEqual(response.status_code, 200)
@@ -997,24 +978,24 @@ def test_manifest_url_tag_is_not_present_when_no_media(self):
Test that content does not contain a manifest url
only when the form has no media
"""
- request = self.factory.get('/')
+ request = self.factory.get("/")
view = XFormListViewSet.as_view({"get": "list"})
- response = view(request, username='bob', pk=self.xform.pk)
+ response = view(request, username="bob", pk=self.xform.pk)
self.assertEqual(response.status_code, 200)
- content = response.render().content.decode('utf-8')
- manifest_url = ('')
+ content = response.render().content.decode("utf-8")
+ manifest_url = ""
self.assertNotIn(manifest_url, content)
# Add media and test that manifest url exists
- data_type = 'media'
- data_value = 'xform {} transportation'.format(self.xform.pk)
+ data_type = "media"
+ data_value = "xform {} transportation".format(self.xform.pk)
self._add_form_metadata(self.xform, data_type, data_value)
- response = view(request, username='bob', pk=self.xform.pk)
+ response = view(request, username="bob", pk=self.xform.pk)
self.assertEqual(response.status_code, 200)
- content = response.render().content.decode('utf-8')
+ content = response.render().content.decode("utf-8")
manifest_url = (
- 'http://testserver/%s/xformsManifest'
- '/%s') % (self.user.username, self.xform.id)
+ "http://testserver/%s/xformsManifest" "/%s"
+ ) % (self.user.username, self.xform.id)
self.assertTrue(manifest_url in content)
def test_form_list_case_insensitivity(self):
@@ -1022,22 +1003,17 @@ def test_form_list_case_insensitivity(self):
Test that the /formList endpoint utilizes the username in a
case insensitive manner
"""
- request = self.factory.get(
- f'/{self.user.username}/formList', **self.extra)
+ request = self.factory.get(f"/{self.user.username}/formList", **self.extra)
response = self.view(request, username=self.user.username)
self.assertEqual(response.status_code, 200)
- request = self.factory.get(
- f'/{self.user.username.capitalize()}', **self.extra)
- response_2 = self.view(
- request, username=self.user.username.capitalize())
+ request = self.factory.get(f"/{self.user.username.capitalize()}", **self.extra)
+ response_2 = self.view(request, username=self.user.username.capitalize())
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data, response_2.data)
- request = self.factory.get(
- f'/{self.user.username.swapcase()}', **self.extra)
- response_3 = self.view(
- request, username=self.user.username.capitalize())
+ request = self.factory.get(f"/{self.user.username.swapcase()}", **self.extra)
+ response_3 = self.view(request, username=self.user.username.capitalize())
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data, response_3.data)
@@ -1047,13 +1023,18 @@ def test_retrieve_form_using_pk(self):
a form properly
"""
# Bob submit forms
- xls_path = os.path.join(settings.PROJECT_ROOT, "apps", "main", "tests",
- "fixtures", "tutorial.xlsx")
+ xls_path = os.path.join(
+ settings.PROJECT_ROOT, "apps", "main", "tests", "fixtures", "tutorial.xlsx"
+ )
self._publish_xls_form_to_project(xlsform_path=xls_path)
- xls_file_path = os.path.join(settings.PROJECT_ROOT, "apps", "logger",
- "fixtures",
- "external_choice_form_v1.xlsx")
+ xls_file_path = os.path.join(
+ settings.PROJECT_ROOT,
+ "apps",
+ "logger",
+ "fixtures",
+ "external_choice_form_v1.xlsx",
+ )
self._publish_xls_form_to_project(xlsform_path=xls_file_path)
# Set require auth to true for form owner
@@ -1063,10 +1044,8 @@ def test_retrieve_form_using_pk(self):
# Ensure that anonymous users do not have access to private forms
self.xform.shared = False
self.xform.save()
- request = self.factory.get(
- f'/enketo/{self.xform.pk}/formList')
- response = self.view(
- request, xform_pk=self.xform.pk)
+ request = self.factory.get(f"/enketo/{self.xform.pk}/formList")
+ response = self.view(request, xform_pk=self.xform.pk)
self.assertEqual(response.status_code, 401)
# Set require auth to false for form owner
@@ -1079,34 +1058,28 @@ def test_retrieve_form_using_pk(self):
# Ensure logged in users have access to the form
alice_data = {
- 'username': 'alice',
- 'email': 'alice@localhost.com',
- 'password1': 'alice',
- 'password2': 'alice'
+ "username": "alice",
+ "email": "alice@localhost.com",
+ "password1": "alice",
+ "password2": "alice",
}
self._create_user_profile(alice_data)
- auth = DigestAuth('alice', 'alice')
- request = self.factory.get(
- f'/enketo/{self.xform.pk}/formList')
+ auth = DigestAuth("alice", "alice")
+ request = self.factory.get(f"/enketo/{self.xform.pk}/formList")
request.META.update(auth(request.META, response))
- response = self.view(
- request, xform_pk=self.xform.pk)
+ response = self.view(request, xform_pk=self.xform.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- response.data[0]['formID'], self.xform.id_string)
+ self.assertEqual(response.data[0]["formID"], self.xform.id_string)
# Ensure anonymous users have access to public forms
# when require_auth is False
- request = self.factory.get(
- f'/enketo/{self.xform.pk}/formList')
- response = self.view(
- request, xform_pk=self.xform.pk)
+ request = self.factory.get(f"/enketo/{self.xform.pk}/formList")
+ response = self.view(request, xform_pk=self.xform.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- response.data[0]['formID'], self.xform.id_string)
+ self.assertEqual(response.data[0]["formID"], self.xform.id_string)
def test_retrieve_form_in_forms_formlist_endpoint(self):
"""
@@ -1114,13 +1087,18 @@ def test_retrieve_form_in_forms_formlist_endpoint(self):
a form properly
"""
# Bob submit forms
- xls_path = os.path.join(settings.PROJECT_ROOT, "apps", "main", "tests",
- "fixtures", "tutorial.xlsx")
+ xls_path = os.path.join(
+ settings.PROJECT_ROOT, "apps", "main", "tests", "fixtures", "tutorial.xlsx"
+ )
self._publish_xls_form_to_project(xlsform_path=xls_path)
- xls_file_path = os.path.join(settings.PROJECT_ROOT, "apps", "logger",
- "fixtures",
- "external_choice_form_v1.xlsx")
+ xls_file_path = os.path.join(
+ settings.PROJECT_ROOT,
+ "apps",
+ "logger",
+ "fixtures",
+ "external_choice_form_v1.xlsx",
+ )
self._publish_xls_form_to_project(xlsform_path=xls_file_path)
# Set require auth to true for form owner
@@ -1130,10 +1108,8 @@ def test_retrieve_form_in_forms_formlist_endpoint(self):
# Ensure that anonymous users do not have access to private forms
self.xform.shared = False
self.xform.save()
- request = self.factory.get(
- f'/forms/{self.xform.pk}/formList')
- response = self.view(
- request, xform_pk=self.xform.pk)
+ request = self.factory.get(f"/forms/{self.xform.pk}/formList")
+ response = self.view(request, xform_pk=self.xform.pk)
self.assertEqual(response.status_code, 401)
# Set require auth to false for form owner
@@ -1146,34 +1122,28 @@ def test_retrieve_form_in_forms_formlist_endpoint(self):
# Ensure logged in users have access to the form
alice_data = {
- 'username': 'alice',
- 'email': 'alice@localhost.com',
- 'password1': 'alice',
- 'password2': 'alice'
+ "username": "alice",
+ "email": "alice@localhost.com",
+ "password1": "alice",
+ "password2": "alice",
}
self._create_user_profile(alice_data)
- auth = DigestAuth('alice', 'alice')
- request = self.factory.get(
- f'/forms/{self.xform.pk}/formList')
+ auth = DigestAuth("alice", "alice")
+ request = self.factory.get(f"/forms/{self.xform.pk}/formList")
request.META.update(auth(request.META, response))
- response = self.view(
- request, xform_pk=self.xform.pk)
+ response = self.view(request, xform_pk=self.xform.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- response.data[0]['formID'], self.xform.id_string)
+ self.assertEqual(response.data[0]["formID"], self.xform.id_string)
# Ensure anonymous users have access to public forms
# when require_auth is False
- request = self.factory.get(
- f'/forms/{self.xform.pk}/formList')
- response = self.view(
- request, xform_pk=self.xform.pk)
+ request = self.factory.get(f"/forms/{self.xform.pk}/formList")
+ response = self.view(request, xform_pk=self.xform.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- response.data[0]['formID'], self.xform.id_string)
+ self.assertEqual(response.data[0]["formID"], self.xform.id_string)
def test_retrieve_forms_in_project(self):
"""
@@ -1181,13 +1151,18 @@ def test_retrieve_forms_in_project(self):
retrieve forms in a project properly
"""
# Bob submit forms
- xls_path = os.path.join(settings.PROJECT_ROOT, "apps", "main", "tests",
- "fixtures", "tutorial.xlsx")
+ xls_path = os.path.join(
+ settings.PROJECT_ROOT, "apps", "main", "tests", "fixtures", "tutorial.xlsx"
+ )
self._publish_xls_form_to_project(xlsform_path=xls_path)
- xls_file_path = os.path.join(settings.PROJECT_ROOT, "apps", "logger",
- "fixtures",
- "external_choice_form_v1.xlsx")
+ xls_file_path = os.path.join(
+ settings.PROJECT_ROOT,
+ "apps",
+ "logger",
+ "fixtures",
+ "external_choice_form_v1.xlsx",
+ )
self._publish_xls_form_to_project(xlsform_path=xls_file_path)
# Set require auth to true for form owner
@@ -1197,10 +1172,8 @@ def test_retrieve_forms_in_project(self):
# Ensure that anonymous users do not have access to private forms
self.xform.shared = False
self.xform.save()
- request = self.factory.get(
- f'/projects/{self.project.pk}/formList')
- response = self.view(
- request, project_pk=self.project.pk)
+ request = self.factory.get(f"/projects/{self.project.pk}/formList")
+ response = self.view(request, project_pk=self.project.pk)
self.assertEqual(response.status_code, 401)
# Set require auth to false for form owner
@@ -1211,21 +1184,17 @@ def test_retrieve_forms_in_project(self):
self.xform.shared = True
self.xform.save()
# check that logged in user (bob) has access to forms
- auth = DigestAuth('bob', 'bobbob')
- request = self.factory.get(
- f'/projects/{self.project.pk}/formList')
+ auth = DigestAuth("bob", "bobbob")
+ request = self.factory.get(f"/projects/{self.project.pk}/formList")
request.META.update(auth(request.META, response))
- response = self.view(
- request, project_pk=self.project.pk)
+ response = self.view(request, project_pk=self.project.pk)
self.assertEqual(response.status_code, 200)
# check number of forms returned in project are 3
self.assertEqual(len(response.data), 3)
# Ensure anonymous users have access to public forms
# when require_auth is False
- request = self.factory.get(
- f'/projects/{self.project.pk}/formList')
- response = self.view(
- request, project_pk=self.project.pk)
+ request = self.factory.get(f"/projects/{self.project.pk}/formList")
+ response = self.view(request, project_pk=self.project.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 3)
diff --git a/onadata/apps/api/viewsets/xform_list_viewset.py b/onadata/apps/api/viewsets/xform_list_viewset.py
index 3fe92295de..318d2d2c4d 100644
--- a/onadata/apps/api/viewsets/xform_list_viewset.py
+++ b/onadata/apps/api/viewsets/xform_list_viewset.py
@@ -3,7 +3,7 @@
OpenRosa Form List API - https://docs.getodk.org/openrosa-form-list/
"""
from django.conf import settings
-from django.http import Http404
+from django.http import Http404, StreamingHttpResponse
from django.shortcuts import get_object_or_404
from django.views.decorators.cache import never_cache
@@ -77,11 +77,24 @@ def get_object(self):
return obj
- def get_renderers(self):
- if self.action and self.action == "manifest":
- return [XFormManifestRenderer()]
+ def get_serializer_class(self):
+ """Return the class to use for the serializer"""
+ if self.action == "manifest":
+ return XFormManifestSerializer
- return super().get_renderers()
+ return super().get_serializer_class()
+
+ def get_serializer(self, *args, **kwargs):
+ """
+ Return the serializer instance that should be used for validating and
+ deserializing input, and for serializing output.
+ """
+ if self.action == "manifest":
+ kwargs.setdefault("context", self.get_serializer_context())
+ kwargs["context"][GROUP_DELIMETER_TAG] = ExportBuilder.GROUP_DELIMITER_DOT
+ kwargs["context"][REPEAT_INDEX_TAGS] = "_,_"
+
+ return super().get_serializer(*args, **kwargs)
def filter_queryset(self, queryset):
username = self.kwargs.get("username")
@@ -164,13 +177,11 @@ def manifest(self, request, *args, **kwargs):
object_list = MetaData.objects.filter(
data_type="media", object_id=self.object.pk
)
- context = self.get_serializer_context()
- context[GROUP_DELIMETER_TAG] = ExportBuilder.GROUP_DELIMITER_DOT
- context[REPEAT_INDEX_TAGS] = "_,_"
- serializer = XFormManifestSerializer(object_list, many=True, context=context)
- return Response(
- serializer.data, headers=get_openrosa_headers(request, location=False)
+ return StreamingHttpResponse(
+ XFormManifestRenderer().stream_data(object_list, self.get_serializer),
+ content_type="text/xml; charset=utf-8",
+ headers=get_openrosa_headers(request, location=False),
)
@action(methods=["GET", "HEAD"], detail=True)
diff --git a/onadata/libs/renderers/renderers.py b/onadata/libs/renderers/renderers.py
index 46fb86a932..4410db62c2 100644
--- a/onadata/libs/renderers/renderers.py
+++ b/onadata/libs/renderers/renderers.py
@@ -316,8 +316,63 @@ def _to_xml(self, xml, data):
xml.characters(smart_str(data))
+class StreamRendererMixin:
+ """Mixin class for renderers that support stream responses"""
+
+ def _get_current_buffer_data(self):
+ if hasattr(self, "stream"):
+ ret = self.stream.getvalue()
+ self.stream.truncate(0)
+ self.stream.seek(0)
+ return ret
+ return None
+
+ def stream_data(self, data, serializer):
+ """Returns a streaming response."""
+ if data is None:
+ yield ""
+
+ # pylint: disable=attribute-defined-outside-init
+ self.stream = StringIO()
+ xml = SimplerXMLGenerator(self.stream, self.charset)
+ xml.startDocument()
+ yield self._get_current_buffer_data()
+ xml.startElement(self.root_node, {"xmlns": self.xmlns})
+ yield self._get_current_buffer_data()
+ data = iter(data)
+
+ try:
+ out = next(data)
+ except StopIteration:
+ out = None
+
+ while out:
+ try:
+ next_item = next(data)
+ out = serializer(out).data
+ out, attributes = _pop_xml_attributes(out)
+ xml.startElement(self.element_node, attributes)
+ self._to_xml(xml, out)
+ xml.endElement(self.element_node)
+ yield self._get_current_buffer_data()
+ out = next_item
+ except StopIteration:
+ out = serializer(out).data
+ out, attributes = _pop_xml_attributes(out)
+ xml.startElement(self.element_node, attributes)
+ self._to_xml(xml, out)
+ xml.endElement(self.element_node)
+ yield self._get_current_buffer_data()
+ break
+
+ xml.endElement(self.root_node)
+ yield self._get_current_buffer_data()
+ xml.endDocument()
+ yield self._get_current_buffer_data()
+
+
# pylint: disable=too-few-public-methods
-class XFormManifestRenderer(XFormListRenderer):
+class XFormManifestRenderer(XFormListRenderer, StreamRendererMixin):
"""
XFormManifestRenderer - render XFormManifest XML.
"""
@@ -346,7 +401,7 @@ def render(self, data, accepted_media_type=None, renderer_context=None):
return super().render(data, accepted_media_type, renderer_context)
-class InstanceXMLRenderer(XMLRenderer):
+class InstanceXMLRenderer(XMLRenderer, StreamRendererMixin):
"""
InstanceXMLRenderer - Renders Instance XML
"""
@@ -354,14 +409,6 @@ class InstanceXMLRenderer(XMLRenderer):
root_tag_name = "submission-batch"
item_tag_name = "submission-item"
- def _get_current_buffer_data(self):
- if hasattr(self, "stream"):
- ret = self.stream.getvalue()
- self.stream.truncate(0)
- self.stream.seek(0)
- return ret
- return None
-
def stream_data(self, data, serializer):
"""Returns a streaming response."""
if data is None: