diff --git a/.dockerignore b/.dockerignore index ef52199..79b547d 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,6 @@ * -!app.py -!endpoints/ -!requirements.txt -!tests/ +!/endpoint/ +!/endpoints/ +!/requirements.txt +!/tests/ +!/pyproject.toml diff --git a/.github/workflows/test-endpoint.yml b/.github/workflows/test-endpoint.yml index 2f9a975..8fc63f4 100644 --- a/.github/workflows/test-endpoint.yml +++ b/.github/workflows/test-endpoint.yml @@ -70,11 +70,10 @@ jobs: env: PYTEST_ADDOPTS: "--color=yes" AUTH_TOKEN: "abcd1234" - KAFKA_HOST: "localhost" - KAFKA_PORT: 9092 KAFKA_BOOTSTRAP_SERVERS: "localhost:9092" - LOG_LEVEL: "DEBUG" + UVICORN_LOG_LEVEL: "debug" + ENDPOINT_CONFIG_URL: "tests/endpoint_config" DEBUG: 1 run: | uvicorn app:app --host 0.0.0.0 --port 8000 --proxy-headers && - pytest -v tests/test_api.py + pytest diff --git a/.gitignore b/.gitignore index dc0fcfe..79e49ba 100644 --- a/.gitignore +++ b/.gitignore @@ -159,3 +159,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ + +# Version file generated by setuptools-scm +_version.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a572649..3c2e030 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,26 +5,7 @@ repos: - id: check-yaml - id: end-of-file-fixer - id: trailing-whitespace - - repo: https://github.com/psf/black - rev: 23.12.0 - hooks: - - id: black - repo: https://github.com/charliermarsh/ruff-pre-commit rev: 'v0.1.8' hooks: - id: ruff - - repo: https://github.com/jazzband/pip-tools - rev: 7.3.0 - hooks: - - id: pip-compile - name: pip-compile requirements.txt - args: [--strip-extras, --output-file=requirements.txt] - files: ^(pyproject\.toml|requirements\.txt)$ - - id: pip-compile - name: pip-compile requirements-test.txt - args: [--extra=test, --strip-extras, --output-file=requirements-test.txt] - files: ^(pyproject\.toml|requirements-test\.txt)$ - - id: pip-compile - name: pip-compile requirements-dev.txt - args: [--extra=dev, --strip-extras, --output-file=requirements-dev.txt] - files: ^(pyproject\.toml|requirements-dev\.txt)$ diff --git a/Dockerfile b/Dockerfile index 2b3f2dc..bdfffea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,25 +1,40 @@ -# mittaridatapumppu-endpoint +# syntax=docker/dockerfile:1 -FROM python:3.11-alpine +ARG PYTHON_VERSION="3.12" +ARG ALPINE_VERSION="3.19" + +FROM python:${PYTHON_VERSION}-alpine${ALPINE_VERSION} as build ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 - -RUN addgroup -S app && adduser -S app -G app -WORKDIR /home/app +ENV VIRTUAL_ENV=/opt/venv +ENV PATH="${VIRTUAL_ENV}/bin:$PATH" # Install requirements to build aiokafka -RUN apk add --no-cache \ - gcc \ - python3-dev \ - libc-dev \ - zlib-dev +RUN --mount=type=cache,target=/var/cache/apk \ + apk add gcc python3-dev libc-dev zlib-dev # Copy and install requirements only first to cache the dependency layer -COPY --chown=app:app requirements.txt . -RUN pip install --no-cache-dir --no-compile --upgrade -r requirements.txt +RUN pip install uv + +COPY pyproject.toml ./ +RUN --mount=type=cache,target=/root/.cache/uv \ +uv venv $VIRTUAL_ENV && \ +uv pip install -r pyproject.toml + +FROM python:3.12-alpine + +ENV PYTHONDONTWRITEBYTECODE 1 +ENV PYTHONUNBUFFERED 1 +ENV VIRTUAL_ENV=/opt/venv +ENV PATH="${VIRTUAL_ENV}/bin:$PATH" + +RUN addgroup -S app && adduser -S app -G app +WORKDIR /home/app -COPY --chown=app:app . . +COPY --from=build --chown=app:app $VIRTUAL_ENV $VIRTUAL_ENV +COPY --chown=app:app endpoint/ ./endpoint +COPY --chown=app:app endpoints/ ./endpoints # Support Arbitrary User IDs RUN chgrp -R 0 /home/app && \ @@ -27,5 +42,5 @@ RUN chgrp -R 0 /home/app && \ USER app -CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers"] EXPOSE 8000/tcp +CMD ["uvicorn", "endpoint.endpoint:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers"] diff --git a/README.md b/README.md index ba2e3aa..08c1e3b 100644 --- a/README.md +++ b/README.md @@ -1 +1,10 @@ # Mittaridatapumppu endpoint + +``` +pip install pip-tools pre-commit +. venv/bin/activate +pre-commit install +pip-sync requirements*.txt +uvicorn endpoint.endpoint:app --host 0.0.0.0 --port 8080 --proxy-headers +API_TOKEN=abcdef1234567890abcdef1234567890abcdef12 venv/bin/python tests/test_api2.py +``` diff --git a/endpoint/__init__.py b/endpoint/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app.py b/endpoint/endpoint.py similarity index 63% rename from app.py rename to endpoint/endpoint.py index 09e04c1..5880373 100755 --- a/app.py +++ b/endpoint/endpoint.py @@ -2,24 +2,25 @@ import logging import os import pprint +import json +from contextlib import asynccontextmanager import httpx from fastapi import FastAPI from fastapi.requests import Request -from fastapi.responses import Response, PlainTextResponse, JSONResponse -from fastapi.routing import APIRoute -from fvhiot.utils import init_script -from fvhiot.utils.aiokafka import ( - get_aiokafka_producer_by_envs, - on_send_success, - on_send_error, -) +from fastapi.responses import JSONResponse, PlainTextResponse, Response +from fvhiot.utils.aiokafka import (get_aiokafka_producer_by_envs, + on_send_error, on_send_success) from fvhiot.utils.data import data_pack -from fvhiot.utils.http.starlettetools import extract_data_from_starlette_request +from fvhiot.utils.http.starlettetools import \ + extract_data_from_starlette_request from sentry_asgi import SentryMiddleware from endpoints import AsyncRequestHandler as RequestHandler +app_producer = None +app_endpoints = {} + # TODO: for testing, add better defaults (or remove completely to make sure it is set in env) ENDPOINT_CONFIG_URL = os.getenv( "ENDPOINT_CONFIG_URL", "http://127.0.0.1:8000/api/v1/hosts/localhost/" @@ -35,6 +36,39 @@ } +@asynccontextmanager +async def lifespan(app: FastAPI): + # Get endpoints from Device registry and create KafkaProducer . + # TODO: Test external connections here, e.g. device registry, redis etc. and crash if some mandatory + # service is missing. + global app_endpoints + global app_producer + endpoints = await get_endpoints_from_device_registry(True) + logging.debug("\n" + pprint.pformat(endpoints)) + if endpoints: + app_endpoints = endpoints + try: + app_producer = await get_aiokafka_producer_by_envs() + except Exception as e: + logging.error(f"Failed to create KafkaProducer: {e}") + app_producer = None + logging.info( + "Ready to go, listening to endpoints: {}".format( + ", ".join(endpoints.keys()) + ) + ) + yield + + # Close KafkaProducer and other connections. + logging.info("Shutdown, close connections") + if app_producer: + await app_producer.stop() + + +app = FastAPI(lifespan=lifespan) +app.add_middleware(SentryMiddleware) + + def get_full_path(request: Request) -> str: """Make sure there is always exactly one leading slash in path.""" return "/" + request.path_params["full_path"].lstrip("/") @@ -45,29 +79,35 @@ async def get_endpoints_from_device_registry(fail_on_error: bool) -> dict: Update endpoints from device registry. This is done on startup and when device registry is updated. """ endpoints = {} - # Create request to ENDPOINTS_URL and get data using httpx - async with httpx.AsyncClient() as client: - try: - response = await client.get( - ENDPOINT_CONFIG_URL, headers=device_registry_request_headers - ) - if response.status_code == 200: - data = response.json() - logging.info( - f"Got {len(data['endpoints'])} endpoints from device registry {ENDPOINT_CONFIG_URL}" + data = {} + if ENDPOINT_CONFIG_URL.startswith("http"): + # Create request to ENDPOINTS_URL and get data using httpx + async with httpx.AsyncClient() as client: + try: + response = await client.get( + ENDPOINT_CONFIG_URL, headers=device_registry_request_headers ) - else: + if response.status_code == 200: + data = response.json() + logging.info( + f"Got {len(data['endpoints'])} endpoints from device registry {ENDPOINT_CONFIG_URL}" + ) + else: + logging.error( + f"Failed to get endpoints from device registry {ENDPOINT_CONFIG_URL}" + ) + return endpoints + except Exception as e: logging.error( - f"Failed to get endpoints from device registry {ENDPOINT_CONFIG_URL}" + f"Failed to get endpoints from device registry {ENDPOINT_CONFIG_URL}: {e}" ) - return endpoints - except Exception as e: - logging.error( - f"Failed to get endpoints from device registry {ENDPOINT_CONFIG_URL}: {e}" - ) - if fail_on_error: - raise e + if fail_on_error: + raise e + else: + with open(ENDPOINT_CONFIG_URL, "r") as file: + return json.loads(file.read()) for endpoint in data["endpoints"]: + logging.debug(f"{endpoint}") # Import requesthandler module. It must exist in python path. try: request_handler_module = importlib.import_module( @@ -79,40 +119,51 @@ async def get_endpoints_from_device_registry(fail_on_error: bool) -> dict: endpoint["request_handler"] = request_handler_function logging.info(f"Imported {endpoint['http_request_handler']}") except ImportError as e: - logging.error(f"Failed to import {endpoint['http_request_handler']}: {e}") + logging.error( + f"Failed to import {endpoint['http_request_handler']}: {e}") endpoints[endpoint["endpoint_path"]] = endpoint return endpoints +@app.get("/") async def root(_request: Request) -> Response: return JSONResponse({"message": "Test ok"}) +@app.get("/notify") async def notify(_request: Request) -> Response: - global app + global app_endpoints endpoints = await get_endpoints_from_device_registry(False) logging.debug("Got endpoints:\n" + pprint.pformat(endpoints)) endpoint_count = len(endpoints) if endpoints: - logging.info(f"Got {endpoint_count} endpoints from device registry in notify") - app.endpoints = endpoints + logging.info( + f"Got {endpoint_count} endpoints from device registry in notify") + app_endpoints = endpoints return PlainTextResponse(f"OK ({endpoint_count})") +@app.get("/readiness") +@app.head("/readiness") async def readiness(_request: Request) -> Response: return PlainTextResponse("OK") -async def healthz(_request: Request) -> Response: +@app.get("/liveness") +@app.head("/liveness") +async def liveness(_request: Request) -> Response: return PlainTextResponse("OK") +@app.get("/debug-sentry") +@app.head("/debug-sentry") async def trigger_error(_request: Request) -> Response: _ = 1 / 0 return PlainTextResponse("Shouldn't reach this") async def api_v2(request: Request, endpoint: dict) -> Response: + global app_producer request_data = await extract_data_from_starlette_request( request ) # data validation done here @@ -120,7 +171,8 @@ async def api_v2(request: Request, endpoint: dict) -> Response: # DONE # logging.error(request_data) if request_data.get("extra"): - logging.warning(f"RequestModel contains extra values: {request_data['extra']}") + logging.warning( + f"RequestModel contains extra values: {request_data['extra']}") if request_data["request"].get("extra"): logging.warning( f"RequestData contains extra values: {request_data['request']['extra']}" @@ -130,18 +182,19 @@ async def api_v2(request: Request, endpoint: dict) -> Response: "request_handler" ].process_request(request_data, endpoint) response_message = str(response_message) - print("REMOVE ME", auth_ok, device_id, topic_name, response_message, status_code) + print("REMOVE ME", auth_ok, device_id, + topic_name, response_message, status_code) # add extracted device id to request data before pushing to kafka raw data topic request_data["device_id"] = device_id # We assume device data is valid here logging.debug(pprint.pformat(request_data)) if auth_ok and topic_name: - if app.producer: + if app_producer: logging.info(f'Sending path "{path}" data to {topic_name}') - packed_data = data_pack(request_data) + packed_data = data_pack(request_data) or {} logging.debug(packed_data[:1000]) try: - res = await app.producer.send_and_wait(topic_name, value=packed_data) + res = await app_producer.send_and_wait(topic_name, value=packed_data) on_send_success(res) except Exception as e: on_send_error(e) @@ -160,75 +213,22 @@ async def api_v2(request: Request, endpoint: dict) -> Response: return PlainTextResponse(response_message, status_code=status_code or 200) +@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "HEAD", "DELETE", "PATCH"]) async def catch_all(request: Request) -> Response: """Catch all requests (except static paths) and route them to correct request handlers.""" + global app_endpoints full_path = get_full_path(request) - # print(full_path, app.endpoints.keys()) - if full_path in app.endpoints: - endpoint = app.endpoints[full_path] + # print(full_path, app_endpoints.keys()) + if full_path in app_endpoints: + endpoint = app_endpoints[full_path] response = await api_v2(request, endpoint) return response else: # return 404 return PlainTextResponse("Not found: " + full_path, status_code=404) -async def startup(): - """ - Get endpoints from Device registry and create KafkaProducer . - TODO: Test external connections here, e.g. device registry, redis etc. and crash if some mandatory - service is missing. - """ - global app - endpoints = await get_endpoints_from_device_registry(True) - logging.debug("\n" + pprint.pformat(endpoints)) - if endpoints: - app.endpoints = endpoints - try: - app.producer = await get_aiokafka_producer_by_envs() - except Exception as e: - logging.error(f"Failed to create KafkaProducer: {e}") - app.producer = None - logging.info( - "Ready to go, listening to endpoints: {}".format( - ", ".join(app.endpoints.keys()) - ) - ) - - -async def shutdown(): - """ - Close KafkaProducer and other connections. - """ - global app - logging.info("Shutdown, close connections") - if app.producer: - await app.producer.stop() - - -routes = [ - APIRoute("/", endpoint=root), - APIRoute("/notify", endpoint=notify, methods=["GET"]), - APIRoute("/readiness", endpoint=readiness, methods=["GET", "HEAD"]), - APIRoute("/healthz", endpoint=healthz, methods=["GET", "HEAD"]), - APIRoute("/debug-sentry", endpoint=trigger_error, methods=["GET", "HEAD"]), - APIRoute( - "/{full_path:path}", - endpoint=catch_all, - methods=["HEAD", "GET", "POST", "PUT", "PATCH", "DELETE"], - ), -] - - -init_script() -debug = True if os.getenv("DEBUG") else False -app = FastAPI(debug=debug, routes=routes, on_startup=[startup], on_shutdown=[shutdown]) -app.producer = None -app.endpoints = {} -app.add_middleware(SentryMiddleware) - # This part is for debugging / PyCharm debugger # See https://fastapi.tiangolo.com/tutorial/debugging/ if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8002) diff --git a/endpoints/__init__.py b/endpoints/__init__.py index f9f4d66..fc76368 100644 --- a/endpoints/__init__.py +++ b/endpoints/__init__.py @@ -1,5 +1,4 @@ import abc - import ipaddress import logging import os @@ -56,9 +55,7 @@ class AsyncRequestHandler(abc.ABC): Async version of BaseRequestHandler, compatible with Starlette, FastAPI and Device registry. """ - def __init__(self): - pass - + @abc.abstractmethod async def validate( self, request_data: dict, endpoint_data: dict ) -> Tuple[bool, Union[str, None], Union[int, None]]: diff --git a/index.html b/index.html deleted file mode 100644 index 8dfb027..0000000 --- a/index.html +++ /dev/null @@ -1,11 +0,0 @@ - - -
- - -