Skip to content

Commit

Permalink
Enrich OPTIONS by adding allowed methods as metadata
Browse files Browse the repository at this point in the history
cleanup tests; added tests for options given new metadata class
  • Loading branch information
why-not-try-calmer committed Jul 27, 2023
1 parent c9e8135 commit 05fa380
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 19 deletions.
File renamed without changes.
12 changes: 11 additions & 1 deletion src/django_oapif/decorators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Any, Callable, Dict, Optional

from django.db.models import Model
from rest_framework import viewsets
from rest_framework import response, viewsets
from rest_framework_gis.serializers import GeoFeatureModelSerializer

from django_oapif.metadata import OAPIFMetadata
from django_oapif.mixins import OAPIFDescribeModelViewSetMixin
from django_oapif.urls import oapif_router

Expand Down Expand Up @@ -76,6 +77,15 @@ class Viewset(OAPIFDescribeModelViewSetMixin, viewsets.ModelViewSet):
# Allowing '.' and '-' in urls
lookup_value_regex = r"[\w.-]+"

# Metadata
metadata_class = OAPIFMetadata

def options(self, request, *args, **kwargs) -> response.Response:
allowed_actions = self.metadata_class().determine_actions(request, self)
allowed_actions = ", ".join(allowed_actions.keys())
data = self.metadata_class().determine_metadata(request, self)
return response.Response(data, headers={"Allow": allowed_actions})

# ON HOLD, WAITING ON GeoFeatureModelSerializer to admit of null geometries
"""
# Apply custom serializer attributes
Expand Down
27 changes: 27 additions & 0 deletions src/django_oapif/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from django.core.exceptions import PermissionDenied
from django.http import Http404
from rest_framework import exceptions, metadata
from rest_framework.request import clone_request


class OAPIFMetadata(metadata.SimpleMetadata):
def determine_actions(self, request, view) -> dict:
actions = {}
for method in set(view.allowed_methods):
view.request = clone_request(request, method)
try:
# Test global permissions
if hasattr(view, "check_permissions"):
view.check_permissions(view.request)
# Test object permissions
if hasattr(view, "get_object") and method in {"PUT", "PATCH", "DELETE"}:
view.get_object()
except (exceptions.APIException, PermissionDenied, Http404):
pass
else:
serializer = view.get_serializer()
actions[method] = self.get_serializer_info(serializer)
finally:
view.request = request

return actions
49 changes: 31 additions & 18 deletions src/signalo/core/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

logger = logging.getLogger(__name__)

collections_url = "/oapif/collections"


def is_dense_partial_order(sorted_it: Iterable[int]) -> bool:
prev = 0
Expand Down Expand Up @@ -111,7 +113,7 @@ def serialize_with_profile(

class SpeedTestSerialization(APITestCase):
@classmethod
def setUpClass(cls):
def setUpTestData(cls):
call_command("populate_vl")
call_command("populate_signs_poles", magnitude=30)
cls.poles = Pole.objects.all()
Expand Down Expand Up @@ -142,34 +144,45 @@ def setUpTestData(cls):

cls.demo_viewer = User.objects.get(username="demo_viewer")
cls.admin = User.objects.get(username="admin")
cls.collection_url = collections_url + "/signalo_core.pole"
cls.items_url = cls.collection_url + "/items"

def tearDown(self):
self.client.force_authenticate(user=None)

def test_auth_as_demo_viewer(self):
def test_get_as_viewer(self):
collections_from_anonymous = self.client.get(
"/oapif/collections", format="json"
collections_url, format="json"
).json()
self.client.force_authenticate(user=self.demo_viewer)
collection_response = self.client.get("/oapif/collections", format="json")
oapif_response_code = self.client.get("/oapif/").status_code
collection_response = self.client.get(collections_url, format="json")

self.assertEqual(oapif_response_code, 200)
self.assertEqual(collection_response.status_code, 200)
self.assertEqual(
len(collection_response.json()), len(collections_from_anonymous)
)

def test_auth_as_admin(self):
collections_from_anonymous = self.client.get(
"/oapif/collections", format="json"
).json()
def test_post_as_admin(self):
self.client.force_authenticate(user=self.admin)
collection_response = self.client.get("/oapif/collections", format="json")
oapif_response_code = self.client.get("/oapif/").status_code

self.assertEqual(oapif_response_code, 200)
self.assertEqual(collection_response.status_code, 200)
self.assertEqual(
len(collection_response.json()), len(collections_from_anonymous)
)
data = {"geom": "Point(1300000 600000)", "name": "test123"}
post_to_items = self.client.post(self.items_url, data, format="json")
self.assertIn(post_to_items.status_code, (200, 201))

def test_anonymous_items_options(self):
# Anonymous client
expected = {"GET", "OPTIONS", "HEAD"}
response = self.client.options(self.items_url)
allowed_headers = map(lambda x: x.strip(), response.headers["Allow"].split(","))
allowed_body = response.json()["actions"].keys()
self.assertEqual(set(allowed_body), expected)
self.assertEqual(set(allowed_headers), set(allowed_body))

def test_admin_items_options(self):
# Authenticated client with write permissions
expected = {"POST", "GET", "OPTIONS", "HEAD"}
self.client.force_authenticate(user=self.admin)
response = self.client.options(self.items_url)
allowed_headers = map(lambda x: x.strip(), response.headers["Allow"].split(","))
allowed_body = response.json()["actions"].keys()
self.assertEqual(set(allowed_body), expected)
self.assertEqual(set(allowed_headers), set(allowed_body))

0 comments on commit 05fa380

Please sign in to comment.