diff --git a/.vscode/settings.json b/.vscode/settings.json index de8d7cb..18113f0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -26,10 +26,16 @@ "**/db.sqlite3": true, "**/.DS_Store": true, "**/*.pyc": true, - "**/__pycache__/": true + "**/__pycache__/": true, + "**/build": true, + "**/dist": true, + "**/*.egg-info": true, }, "search.exclude": { "**/.git": true, + "**/build": true, + "**/*.egg-info": true, + "**/dist": true, "**/.venv": true, "**/tmp": true, "htmlcov/*": true, diff --git a/creyPY/fastapi/pagination.py b/creyPY/fastapi/pagination.py index 00820b7..ad339ab 100644 --- a/creyPY/fastapi/pagination.py +++ b/creyPY/fastapi/pagination.py @@ -1,10 +1,21 @@ from math import ceil -from typing import Any, Generic, Optional, Self, Sequence, TypeVar +from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union from fastapi_pagination import Params from fastapi_pagination.bases import AbstractPage, AbstractParams -from fastapi_pagination.types import GreaterEqualOne, GreaterEqualZero +from fastapi_pagination.types import ( + GreaterEqualOne, + GreaterEqualZero, + AdditionalData, + SyncItemsTransformer, +) +from fastapi_pagination.api import create_page, apply_items_transformer +from fastapi_pagination.utils import verify_params +from fastapi_pagination.ext.sqlalchemy import create_paginate_query from pydantic.json_schema import SkipJsonSchema +from sqlalchemy.sql.selectable import Select +from sqlalchemy.orm.session import Session +from sqlalchemy import select, func T = TypeVar("T") @@ -70,3 +81,45 @@ def parse_page(response, page: int, size: int) -> Page: has_next=response.has_next, has_prev=response.has_prev, ) + + +def create_count_query(query: Select) -> Select: + return select(func.count()).select_from(query.subquery()) + + +def unwrap_scalars( + items: Sequence[Sequence[T]], + force_unwrap: bool = True, +) -> Union[Sequence[T], Sequence[Sequence[T]]]: + return [item[0] if force_unwrap else item for item in items] + + +def paginate( + connection: Session, + query: Select, + paginationFlag: bool = True, + params: Optional[AbstractParams] = None, + transformer: Optional[SyncItemsTransformer] = None, + additional_data: Optional[AdditionalData] = None, +): + + params, raw_params = verify_params(params, "limit-offset", "cursor") + + count_query = create_count_query(query) + total = connection.scalar(count_query) + + if paginationFlag is False: + params = Params(page=1, size=total) + + query = create_paginate_query(query, params) + items = connection.execute(query).all() + + items = unwrap_scalars(items) + t_items = apply_items_transformer(items, transformer) + + return create_page( + t_items, + params=params, + total=total, + **(additional_data or {}), + )