From 022a2f771ce2155fce1b53c4c5c3d89c75115e10 Mon Sep 17 00:00:00 2001 From: Manuel Stingl Date: Sun, 17 Sep 2023 15:44:57 +0200 Subject: [PATCH] feature: add negating search filters using `!` before the field name, the query is negated --- djfapi/routing/django.py | 206 +++++++++++++++++++++------------------ setup.cfg | 4 +- 2 files changed, 112 insertions(+), 98 deletions(-) diff --git a/djfapi/routing/django.py b/djfapi/routing/django.py index 4e72d8c..94339b5 100644 --- a/djfapi/routing/django.py +++ b/djfapi/routing/django.py @@ -1,13 +1,11 @@ -from collections import defaultdict from datetime import date from enum import Enum from functools import cached_property, wraps -from typing import Any, List, Optional, Tuple, Type, TypeVar, Union +from typing import Any, List, Optional, Tuple, Type, TypeVar, Union, Generator, Dict import forge -from asgiref.sync import sync_to_async from pydantic import create_model, constr from pydantic.fields import Undefined, UndefinedType -from django.db import models, connections, close_old_connections +from django.db import models, connections from django.db.transaction import atomic from fastapi import APIRouter, Security, Path, Body, Depends, Query, Response, Request from fastapi.security.base import SecurityBase @@ -81,7 +79,7 @@ def id_field_placeholder(self): return '/{%s}' % self.id_field @cached_property - def model_fields(self): + def model_fields(self) -> Enum: def _get_model_fields(model, prefix='', recursion_tree=None): if recursion_tree is None: recursion_tree = [] @@ -486,7 +484,12 @@ def depends_response_headers(self, method: Method, request: Request, response: R return response def endpoint_list( - self, *, access: Optional[Access] = None, pagination: Pagination, search: models.Q = models.Q(), **kwargs + self, + *, + access: Optional[Access] = None, + pagination: Pagination, + search: models.Q = models.Q(), + **kwargs, ): ids = self._get_ids(kwargs, include_self=False) return self.list( @@ -498,11 +501,24 @@ def endpoint_list( search=search, pagination=pagination, ) - ] + ], ) - def search_filter(self, **kwargs) -> models.Q: - q = models.Q(**remove_none(kwargs)) + def search_filter(self, **kwargs: Dict[str, Any]) -> models.Q: + q = models.Q() + for arg, value in kwargs.items(): + if value is None: + continue + + if arg.startswith('not__'): + query = models.Q(**{arg[5:]: value}) + query.negate() + + else: + query = models.Q(**{arg[5:]: value}) + + q &= query + if self.delete_status and 'status__in' in kwargs and kwargs['status__in'] is None: q &= ~models.Q(status=self.delete_status) @@ -536,106 +552,104 @@ def _get_field_variations(self, field: models.Field, field_name: str = None, fie return variations - def search_filter_fields(self) -> List[forge.FParameter]: - fields = defaultdict(list) - for mfield in self.model_fields: - field: models.Field = mfield.value - if getattr(field, 'primary_key', False) or field.name == 'tenant_id': - continue + def _search_filter_field(self, model_field) -> Generator[Tuple[str, Type, dict], None, None]: + field: models.Field = model_field.value + if field.name == 'tenant_id' or (getattr(field, 'primary_key', False) and self.model != field.model): + return - field_type = get_field_type(field) - field_name = mfield._name_ + field_type = get_field_type(field) + field_name = model_field._name_ - assert ( - isinstance( - field, - ( - models.ManyToManyRel, - models.ManyToOneRel, - ), - ) - or field_type - ), f'Field {field.name} on model {self.model} is missing a type annotation' + assert ( + isinstance( + field, + ( + models.ManyToManyRel, + models.ManyToOneRel, + ), + ) + or field_type + ), f'Field {field.name} on model {self.model} is missing a type annotation' - query_options = { - 'default': None, - 'include_in_schema': self.do_include_query_fields_in_schema, - } + query_options = { + 'default': None, + 'include_in_schema': self.do_include_query_fields_in_schema, + } - if isinstance(field, (models.ForeignKey, models.ManyToManyField)): - field_type = List[constr(min_length=field.max_length, max_length=field.max_length)] - field_name += '__id' - query_options.update(alias=field_name) - field_name += '__in' + if isinstance(field, (models.ForeignKey, models.ManyToManyField)): + field_type = List[constr(min_length=field.max_length, max_length=field.max_length)] + field_name += '__id' + query_options.update(alias=field_name) + field_name += '__in' - if self.parent and field.related_model == self.parent.model: - continue + if self.parent and field.related_model == self.parent.model: + return - if isinstance(field, models.ManyToManyField): - continue + if isinstance(field, models.ManyToManyField): + return - if field.null: - fields[field].append( - forge.kwarg( - f'{query_options.get("alias") or field_name}__isnull', - type=Optional[bool], - default=Query(**{**query_options, 'alias': None}), - ) - ) + if field.null: + _name = f'{query_options.get("alias") or field_name}__isnull' + yield _name, bool, {**query_options, 'alias': _name} + + if isinstance(field, (models.ManyToManyRel, models.ManyToOneRel)): + field_name += '__count' + + if isinstance( + field, + ( + models.DateField, + models.DateTimeField, + models.IntegerField, + models.FloatField, + models.DecimalField, + models.ManyToManyRel, + models.ManyToOneRel, + ), + ): + for variation in self._get_field_variations(field, field_name, field_type): + name = variation + type_ = field_type + if isinstance(variation, tuple): + name, type_ = variation + + yield f'{name}__gte', type_, query_options + yield f'{name}__lte', type_, query_options + + elif isinstance(field, models.CharField): + if field.choices or getattr(field, 'primary_key', False): + query_options['alias'] = field_name + if field_name == 'status' and self.delete_status: + query_options[ + 'description' + ] = f"When not set, objects with status {self.delete_status} are excluded" - if isinstance(field, (models.ManyToManyRel, models.ManyToOneRel)): - field_name += '__count' + field_name += '__in' + field_type = List[field_type] - if isinstance( - field, - ( - models.DateField, - models.DateTimeField, - models.IntegerField, - models.FloatField, - models.DecimalField, - models.ManyToManyRel, - models.ManyToOneRel, - ), - ): - for variation in self._get_field_variations(field, field_name, field_type): - name = variation - type_ = field_type - if isinstance(variation, tuple): - name, type_ = variation - - fields[field] += [ - forge.kwarg(f'{name}__gte', type=Optional[type_], default=Query(**query_options)), - forge.kwarg(f'{name}__lte', type=Optional[type_], default=Query(**query_options)), - ] - - elif isinstance(field, models.CharField): - if field.choices: - query_options['alias'] = field_name - if field_name == 'status' and self.delete_status: - query_options[ - 'description' - ] = f"When not set, objects with status {self.delete_status} are excluded" - - field_name += '__in' - field_type = List[field_type] - - else: - query_options['max_length'] = field.max_length - fields[field].append( - forge.kwarg( - f'{field_name}__icontains', type=Optional[field_type], default=Query(**query_options) - ) - ) + else: + query_options['max_length'] = field.max_length + yield f'{field_name}__icontains', field_type, query_options - fields[field].insert( - 0, forge.kwarg(f'{field_name}', type=Optional[field_type], default=Query(**query_options)) - ) + yield f'{field_name}', field_type, query_options - return [x for xs in fields.values() for x in xs] + def search_filter_fields(self) -> Generator[forge.FParameter, None, None]: + for model_field in self.model_fields: + for name, type_, options in self._search_filter_field(model_field): + yield forge.kwarg( + name, + type=Optional[type_], + default=Query(**options), + ) + yield forge.kwarg( + f'not__{name}', + type=Optional[type_], + default=Query(**{**options, 'alias': '!' + options.get('alias', name)}), + ) def create_depends_search(self): - return forge.sign(*self.search_filter_fields())(self.search_filter) + fields = list(self.search_filter_fields()) + return forge.sign(*fields)(self.search_filter) def _depends_search(self): yield forge.kwarg('search', type=models.Q, default=Depends(self.create_depends_search())) diff --git a/setup.cfg b/setup.cfg index a7345dd..ab511cb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = djfapi -version = 0.0.68 +version = 0.0.69 author = Manuel Stingl author_email = opensource@voltane.eu description = Utilities for use with FastAPI and django @@ -23,7 +23,7 @@ python_requires = >=3.9 packages = find: include_package_data = true install_requires = - djdantic >= 0.0.12 + djdantic >= 0.0.23 fastapi >= 0.63.0 django-health-check >= 3.16 python-forge