Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cache busting across all gunicorn/uvicorn workers #99

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
fly.staging.toml
fly.prod.toml
cache-watch-dog/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ offsets_db_api/_version.py
*.sqlite
*.csv
*.gz
cache-watch-dog/
7 changes: 7 additions & 0 deletions offsets_db_api/cache.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pathlib
import typing

from fastapi import Request, Response
Expand All @@ -10,6 +11,12 @@

CACHE_NAMESPACE = 'offsets-db'

app_dir = pathlib.Path(__file__).parent.parent

watch_dog_dir = app_dir / 'cache-watch-dog'
watch_dog_dir.mkdir(parents=True, exist_ok=True)
watch_dog_file = watch_dog_dir / 'last-db-update.txt'


def request_key_builder(
func: typing.Callable[..., typing.Any],
Expand Down
9 changes: 8 additions & 1 deletion offsets_db_api/logging.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import logging
import os
import sys


def get_logger() -> logging.Logger:
logger = logging.getLogger('offsets-db-api')
worker_id = os.environ.get('APP_WORKER_ID', '')

if not logger.handlers:
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter('%(levelname)s: %(name)s - %(message)s'))
if worker_id != '':
handler.setFormatter(
logging.Formatter(f'[%(name)s] [worker {worker_id}] [%(levelname)s] %(message)s')
)
else:
handler.setFormatter(logging.Formatter('[%(name)s] [%(levelname)s] %(message)s'))
logger.addHandler(handler)

logger.setLevel(logging.DEBUG)
Expand Down
21 changes: 20 additions & 1 deletion offsets_db_api/main.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import asyncio
import os
import pathlib
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from .app_metadata import metadata
from .cache import request_key_builder
from .cache import clear_cache, request_key_builder, watch_dog_dir, watch_dog_file
from .logging import get_logger
from .routers import charts, clips, credits, files, health, projects

logger = get_logger()


class CacheInvalidationHandler(FileSystemEventHandler):
def on_modified(self, event):
event_path = pathlib.Path(event.src_path).resolve()
if event_path == watch_dog_file.resolve():
logger.info('🔄 File modified: %s', event_path)
asyncio.run(clear_cache())


@asynccontextmanager
async def lifespan_event(app: FastAPI):
"""
Expand All @@ -39,11 +51,18 @@ async def lifespan_event(app: FastAPI):
f'🔥 Cache set up with expiration={expiration:,} seconds | {cache_status_header} cache status header.'
)

event_handler = CacheInvalidationHandler()
observer = Observer()
observer.schedule(event_handler, path=str(watch_dog_dir), recursive=False)
observer.start()

yield

logger.info('Application shutdown...')
logger.info('Clearing cache...')
FastAPICache.reset()
observer.stop()
observer.join()
logger.info('👋 Goodbye!')


Expand Down
9 changes: 7 additions & 2 deletions offsets_db_api/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import datetime
import traceback

import pandas as pd
from offsets_db_data.models import clip_schema, credit_schema, project_schema
from sqlmodel import ARRAY, BigInteger, Boolean, Date, DateTime, String, text

from .cache import clear_cache
from .cache import watch_dog_file
from .logging import get_logger
from .models import File

Expand Down Expand Up @@ -144,4 +145,8 @@ async def process_files(*, engine, session, files: list[File]):

process_dataframe(clip_projects_df, 'clipproject', engine)

await clear_cache()
# modify the watch_dog_file
with open(watch_dog_file, 'w') as f:
now = datetime.datetime.utcnow()
logger.info(f'✅ Updated watch_dog_file: {watch_dog_file} to {now}')
f.write(now.strftime('%Y-%m-%d %H:%M:%S'))
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ ujson
universal_pathlib
uvicorn
fastapi-cache2 @ git+https://github.com/andersy005/fastapi-cache@pydantic-v2-compat # for pydantic v2 compatilibity
watchdog>=3.0
Loading