From 2705bf8493fc0177a4a8824ce63cfee186920c10 Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Thu, 25 Jan 2024 13:16:19 -0800 Subject: [PATCH 1/2] fix cache busting across all gunicorn/uvicorn workers --- .dockerignore | 1 + .gitignore | 1 + offsets_db_api/cache.py | 7 +++++++ offsets_db_api/logging.py | 9 ++++++++- offsets_db_api/main.py | 21 ++++++++++++++++++++- offsets_db_api/tasks.py | 9 +++++++-- 6 files changed, 44 insertions(+), 4 deletions(-) diff --git a/.dockerignore b/.dockerignore index 3f81414..bdff1a6 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,2 +1,3 @@ fly.staging.toml fly.prod.toml +cache-watch-dog/ diff --git a/.gitignore b/.gitignore index b656ae4..0dfe524 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,4 @@ offsets_db_api/_version.py *.sqlite *.csv *.gz +cache-watch-dog/ diff --git a/offsets_db_api/cache.py b/offsets_db_api/cache.py index 180053b..661ce14 100644 --- a/offsets_db_api/cache.py +++ b/offsets_db_api/cache.py @@ -1,3 +1,4 @@ +import pathlib import typing from fastapi import Request, Response @@ -10,6 +11,12 @@ CACHE_NAMESPACE = 'offsets-db' +app_dir = pathlib.Path(__file__).parent.parent + +watch_dog_dir = app_dir / 'cache-watch-dog' +watch_dog_dir.mkdir(parents=True, exist_ok=True) +watch_dog_file = watch_dog_dir / 'last-db-update.txt' + def request_key_builder( func: typing.Callable[..., typing.Any], diff --git a/offsets_db_api/logging.py b/offsets_db_api/logging.py index 7490952..9f19776 100644 --- a/offsets_db_api/logging.py +++ b/offsets_db_api/logging.py @@ -1,13 +1,20 @@ import logging +import os import sys def get_logger() -> logging.Logger: logger = logging.getLogger('offsets-db-api') + worker_id = os.environ.get('APP_WORKER_ID', '') if not logger.handlers: handler = logging.StreamHandler(stream=sys.stdout) - handler.setFormatter(logging.Formatter('%(levelname)s: %(name)s - %(message)s')) + if worker_id != '': + handler.setFormatter( + logging.Formatter(f'[%(name)s] [worker {worker_id}] [%(levelname)s] %(message)s') + ) + else: + handler.setFormatter(logging.Formatter('[%(name)s] [%(levelname)s] %(message)s')) logger.addHandler(handler) logger.setLevel(logging.DEBUG) diff --git a/offsets_db_api/main.py b/offsets_db_api/main.py index 9f3c927..617e1d1 100644 --- a/offsets_db_api/main.py +++ b/offsets_db_api/main.py @@ -1,19 +1,31 @@ +import asyncio import os +import pathlib from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi_cache import FastAPICache from fastapi_cache.backends.inmemory import InMemoryBackend +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer from .app_metadata import metadata -from .cache import request_key_builder +from .cache import clear_cache, request_key_builder, watch_dog_dir, watch_dog_file from .logging import get_logger from .routers import charts, clips, credits, files, health, projects logger = get_logger() +class CacheInvalidationHandler(FileSystemEventHandler): + def on_modified(self, event): + event_path = pathlib.Path(event.src_path).resolve() + if event_path == watch_dog_file.resolve(): + logger.info('🔄 File modified: %s', event_path) + asyncio.run(clear_cache()) + + @asynccontextmanager async def lifespan_event(app: FastAPI): """ @@ -39,11 +51,18 @@ async def lifespan_event(app: FastAPI): f'🔥 Cache set up with expiration={expiration:,} seconds | {cache_status_header} cache status header.' ) + event_handler = CacheInvalidationHandler() + observer = Observer() + observer.schedule(event_handler, path=str(watch_dog_dir), recursive=False) + observer.start() + yield logger.info('Application shutdown...') logger.info('Clearing cache...') FastAPICache.reset() + observer.stop() + observer.join() logger.info('👋 Goodbye!') diff --git a/offsets_db_api/tasks.py b/offsets_db_api/tasks.py index af828cc..37e41e4 100644 --- a/offsets_db_api/tasks.py +++ b/offsets_db_api/tasks.py @@ -1,10 +1,11 @@ +import datetime import traceback import pandas as pd from offsets_db_data.models import clip_schema, credit_schema, project_schema from sqlmodel import ARRAY, BigInteger, Boolean, Date, DateTime, String, text -from .cache import clear_cache +from .cache import watch_dog_file from .logging import get_logger from .models import File @@ -144,4 +145,8 @@ async def process_files(*, engine, session, files: list[File]): process_dataframe(clip_projects_df, 'clipproject', engine) - await clear_cache() + # modify the watch_dog_file + with open(watch_dog_file, 'w') as f: + now = datetime.datetime.utcnow() + logger.info(f'✅ Updated watch_dog_file: {watch_dog_file} to {now}') + f.write(now.strftime('%Y-%m-%d %H:%M:%S')) From bba427336831c37a3c5dfe2f5fe2ac770e332b77 Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Thu, 25 Jan 2024 13:18:02 -0800 Subject: [PATCH 2/2] Add watchdog dependency for file monitoring --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 0ed17d7..65c5299 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ ujson universal_pathlib uvicorn fastapi-cache2 @ git+https://github.com/andersy005/fastapi-cache@pydantic-v2-compat # for pydantic v2 compatilibity +watchdog>=3.0