Skip to content

Commit

Permalink
feature: add negating search filters
Browse files Browse the repository at this point in the history
using `!` before the field name, the query is negated
  • Loading branch information
mstingl committed Sep 17, 2023
1 parent 101f56f commit 022a2f7
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 98 deletions.
206 changes: 110 additions & 96 deletions djfapi/routing/django.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from collections import defaultdict
from datetime import date
from enum import Enum
from functools import cached_property, wraps
from typing import Any, List, Optional, Tuple, Type, TypeVar, Union
from typing import Any, List, Optional, Tuple, Type, TypeVar, Union, Generator, Dict
import forge
from asgiref.sync import sync_to_async
from pydantic import create_model, constr
from pydantic.fields import Undefined, UndefinedType
from django.db import models, connections, close_old_connections
from django.db import models, connections
from django.db.transaction import atomic
from fastapi import APIRouter, Security, Path, Body, Depends, Query, Response, Request
from fastapi.security.base import SecurityBase
Expand Down Expand Up @@ -81,7 +79,7 @@ def id_field_placeholder(self):
return '/{%s}' % self.id_field

@cached_property
def model_fields(self):
def model_fields(self) -> Enum:
def _get_model_fields(model, prefix='', recursion_tree=None):
if recursion_tree is None:
recursion_tree = []
Expand Down Expand Up @@ -486,7 +484,12 @@ def depends_response_headers(self, method: Method, request: Request, response: R
return response

def endpoint_list(
self, *, access: Optional[Access] = None, pagination: Pagination, search: models.Q = models.Q(), **kwargs
self,
*,
access: Optional[Access] = None,
pagination: Pagination,
search: models.Q = models.Q(),
**kwargs,
):
ids = self._get_ids(kwargs, include_self=False)
return self.list(
Expand All @@ -498,11 +501,24 @@ def endpoint_list(
search=search,
pagination=pagination,
)
]
],
)

def search_filter(self, **kwargs) -> models.Q:
q = models.Q(**remove_none(kwargs))
def search_filter(self, **kwargs: Dict[str, Any]) -> models.Q:
q = models.Q()
for arg, value in kwargs.items():
if value is None:
continue

if arg.startswith('not__'):
query = models.Q(**{arg[5:]: value})
query.negate()

else:
query = models.Q(**{arg[5:]: value})

q &= query

if self.delete_status and 'status__in' in kwargs and kwargs['status__in'] is None:
q &= ~models.Q(status=self.delete_status)

Expand Down Expand Up @@ -536,106 +552,104 @@ def _get_field_variations(self, field: models.Field, field_name: str = None, fie

return variations

def search_filter_fields(self) -> List[forge.FParameter]:
fields = defaultdict(list)
for mfield in self.model_fields:
field: models.Field = mfield.value
if getattr(field, 'primary_key', False) or field.name == 'tenant_id':
continue
def _search_filter_field(self, model_field) -> Generator[Tuple[str, Type, dict], None, None]:
field: models.Field = model_field.value
if field.name == 'tenant_id' or (getattr(field, 'primary_key', False) and self.model != field.model):
return

field_type = get_field_type(field)
field_name = mfield._name_
field_type = get_field_type(field)
field_name = model_field._name_

assert (
isinstance(
field,
(
models.ManyToManyRel,
models.ManyToOneRel,
),
)
or field_type
), f'Field {field.name} on model {self.model} is missing a type annotation'
assert (
isinstance(
field,
(
models.ManyToManyRel,
models.ManyToOneRel,
),
)
or field_type
), f'Field {field.name} on model {self.model} is missing a type annotation'

query_options = {
'default': None,
'include_in_schema': self.do_include_query_fields_in_schema,
}
query_options = {
'default': None,
'include_in_schema': self.do_include_query_fields_in_schema,
}

if isinstance(field, (models.ForeignKey, models.ManyToManyField)):
field_type = List[constr(min_length=field.max_length, max_length=field.max_length)]
field_name += '__id'
query_options.update(alias=field_name)
field_name += '__in'
if isinstance(field, (models.ForeignKey, models.ManyToManyField)):
field_type = List[constr(min_length=field.max_length, max_length=field.max_length)]
field_name += '__id'
query_options.update(alias=field_name)
field_name += '__in'

if self.parent and field.related_model == self.parent.model:
continue
if self.parent and field.related_model == self.parent.model:
return

if isinstance(field, models.ManyToManyField):
continue
if isinstance(field, models.ManyToManyField):
return

if field.null:
fields[field].append(
forge.kwarg(
f'{query_options.get("alias") or field_name}__isnull',
type=Optional[bool],
default=Query(**{**query_options, 'alias': None}),
)
)
if field.null:
_name = f'{query_options.get("alias") or field_name}__isnull'
yield _name, bool, {**query_options, 'alias': _name}

if isinstance(field, (models.ManyToManyRel, models.ManyToOneRel)):
field_name += '__count'

if isinstance(
field,
(
models.DateField,
models.DateTimeField,
models.IntegerField,
models.FloatField,
models.DecimalField,
models.ManyToManyRel,
models.ManyToOneRel,
),
):
for variation in self._get_field_variations(field, field_name, field_type):
name = variation
type_ = field_type
if isinstance(variation, tuple):
name, type_ = variation

yield f'{name}__gte', type_, query_options
yield f'{name}__lte', type_, query_options

elif isinstance(field, models.CharField):
if field.choices or getattr(field, 'primary_key', False):
query_options['alias'] = field_name
if field_name == 'status' and self.delete_status:
query_options[
'description'
] = f"When not set, objects with status {self.delete_status} are excluded"

if isinstance(field, (models.ManyToManyRel, models.ManyToOneRel)):
field_name += '__count'
field_name += '__in'
field_type = List[field_type]

if isinstance(
field,
(
models.DateField,
models.DateTimeField,
models.IntegerField,
models.FloatField,
models.DecimalField,
models.ManyToManyRel,
models.ManyToOneRel,
),
):
for variation in self._get_field_variations(field, field_name, field_type):
name = variation
type_ = field_type
if isinstance(variation, tuple):
name, type_ = variation

fields[field] += [
forge.kwarg(f'{name}__gte', type=Optional[type_], default=Query(**query_options)),
forge.kwarg(f'{name}__lte', type=Optional[type_], default=Query(**query_options)),
]

elif isinstance(field, models.CharField):
if field.choices:
query_options['alias'] = field_name
if field_name == 'status' and self.delete_status:
query_options[
'description'
] = f"When not set, objects with status {self.delete_status} are excluded"

field_name += '__in'
field_type = List[field_type]

else:
query_options['max_length'] = field.max_length
fields[field].append(
forge.kwarg(
f'{field_name}__icontains', type=Optional[field_type], default=Query(**query_options)
)
)
else:
query_options['max_length'] = field.max_length
yield f'{field_name}__icontains', field_type, query_options

fields[field].insert(
0, forge.kwarg(f'{field_name}', type=Optional[field_type], default=Query(**query_options))
)
yield f'{field_name}', field_type, query_options

return [x for xs in fields.values() for x in xs]
def search_filter_fields(self) -> Generator[forge.FParameter, None, None]:
for model_field in self.model_fields:
for name, type_, options in self._search_filter_field(model_field):
yield forge.kwarg(
name,
type=Optional[type_],
default=Query(**options),
)
yield forge.kwarg(
f'not__{name}',
type=Optional[type_],
default=Query(**{**options, 'alias': '!' + options.get('alias', name)}),
)

def create_depends_search(self):
return forge.sign(*self.search_filter_fields())(self.search_filter)
fields = list(self.search_filter_fields())
return forge.sign(*fields)(self.search_filter)

def _depends_search(self):
yield forge.kwarg('search', type=models.Q, default=Depends(self.create_depends_search()))
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = djfapi
version = 0.0.68
version = 0.0.69
author = Manuel Stingl
author_email = opensource@voltane.eu
description = Utilities for use with FastAPI and django
Expand All @@ -23,7 +23,7 @@ python_requires = >=3.9
packages = find:
include_package_data = true
install_requires =
djdantic >= 0.0.12
djdantic >= 0.0.23
fastapi >= 0.63.0
django-health-check >= 3.16
python-forge
Expand Down

0 comments on commit 022a2f7

Please sign in to comment.