diff --git a/rose/watcher.py b/rose/watcher.py index 9ff1c36..422fabd 100644 --- a/rose/watcher.py +++ b/rose/watcher.py @@ -1,18 +1,19 @@ +import asyncio +import contextlib import logging +import time from dataclasses import dataclass +from multiprocessing import Process from pathlib import Path +from queue import Empty, Queue +from typing import Literal from watchdog.events import ( - DirCreatedEvent, - DirDeletedEvent, - FileCreatedEvent, - FileDeletedEvent, - FileModifiedEvent, + FileSystemEvent, FileSystemEventHandler, FileSystemMovedEvent, ) from watchdog.observers import Observer -from watchdog.observers.api import BaseObserver from rose.cache import ( update_cache_evict_nonexistent_collages, @@ -25,90 +26,139 @@ logger = logging.getLogger(__name__) -@dataclass -class AffectedEntity: - release: Path | None = None +# Changes to releases occur across an entire directory, but change events come in at a file +# granularity. We only want to operate on a release once all files have finished changing. +# Otherwise, we may observe effects like a `.rose.{uuid}.toml` file being created for a "new" +# release, and afterwards an existing `.rose.{uuid}.toml` file gets copied in by the trailing +# filesystem operation. +# +# Therefore, we architect the watcher like so: +# +# Process +# Thread -> watchdog/inotify listener that enqueues events +# Event Loop -> processes+debounces events asynchronously from the queue + + +EventType = Literal["created", "deleted", "modified", "moved"] +EVENT_TYPES: list[EventType] = ["created", "deleted", "modified", "moved"] + + +@dataclass(frozen=True) +class WatchdogEvent: + type: EventType collage: str | None = None + release: Path | None = None -def parse_affected_entity(config: Config, path: str) -> AffectedEntity | None: - relative_path = path.removeprefix(str(config.music_source_dir) + "/") - if relative_path.startswith("!collages/"): - if not relative_path.endswith(".toml"): - return None - collage = relative_path.removeprefix("!collages/").removesuffix(".toml") - logger.debug(f"Parsed change event on collage {collage}") - return AffectedEntity(collage=collage) - try: - release_dir = config.music_source_dir / Path(relative_path).parts[0] - logger.debug(f"Parsed event on release {release_dir}") - return AffectedEntity(release=release_dir) - except IndexError: - return None - - -class EventHandler(FileSystemEventHandler): - def __init__(self, config: Config): +class EventHandler(FileSystemEventHandler): # pragma: no cover + def __init__(self, config: Config, queue: Queue[WatchdogEvent]): super().__init__() self.config = config + self.queue = queue - def on_created(self, event: FileCreatedEvent | DirCreatedEvent) -> None: - super().on_created(event) # type: ignore - logger.debug(f"Notified of change event for {event.src_path}") - affected = parse_affected_entity(self.config, event.src_path) - if not affected: - return - if affected.collage: - update_cache_for_collages(self.config, [affected.collage]) - elif affected.release: - update_cache_for_releases(self.config, [affected.release]) - - def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None: - super().on_deleted(event) # type: ignore - logger.debug(f"Notified of change event for {event.src_path}") - affected = parse_affected_entity(self.config, event.src_path) - if not affected: - return - if affected.collage: - update_cache_evict_nonexistent_collages(self.config) - elif affected.release: - update_cache_evict_nonexistent_releases(self.config) - - def on_modified(self, event: FileModifiedEvent) -> None: - super().on_modified(event) # type: ignore - logger.debug(f"Notified of change event for {event.src_path}") - affected = parse_affected_entity(self.config, event.src_path) - if not affected: - return - if affected.collage: - update_cache_for_collages(self.config, [affected.collage]) - elif affected.release: - update_cache_for_releases(self.config, [affected.release]) - - def on_moved(self, event: FileSystemMovedEvent) -> None: - super().on_moved(event) # type: ignore - logger.debug(f"Notified of change event for {event.src_path}") - affected = parse_affected_entity(self.config, event.dest_path) - if not affected: + def on_any_event(self, event: FileSystemEvent) -> None: + super().on_any_event(event) # type: ignore + path = event.dest_path if isinstance(event, FileSystemMovedEvent) else event.src_path + logger.debug(f"Notified of {event.event_type} event for {path}") + + etype: EventType = event.event_type # type: ignore + if etype not in EVENT_TYPES: return - if affected.collage: - update_cache_for_collages(self.config, [affected.collage]) - update_cache_evict_nonexistent_collages(self.config) - elif affected.release: - update_cache_for_releases(self.config, [affected.release]) - update_cache_evict_nonexistent_releases(self.config) + relative_path = path.removeprefix(str(self.config.music_source_dir) + "/") + if relative_path.startswith("!collages/"): + if not relative_path.endswith(".toml"): + return + collage = relative_path.removeprefix("!collages/").removesuffix(".toml") + logger.debug(f"Queueing {etype} event on collage {collage}") + self.queue.put(WatchdogEvent(collage=collage, type=etype)) + return -def create_watchdog_observer(c: Config) -> BaseObserver: + with contextlib.suppress(IndexError): + final_path_part = Path(relative_path).parts[0] + if final_path_part == "/": + return + release_dir = self.config.music_source_dir / final_path_part + logger.debug(f"Queueing {etype} event on release {release_dir}") + self.queue.put(WatchdogEvent(release=release_dir, type=etype)) + + +async def handle_event( + c: Config, + e: WatchdogEvent, + wait: float | None = None, +) -> None: # pragma: no cover + if wait: + await asyncio.sleep(wait) + + if e.type == "created" or e.type == "modified": + if e.collage: + update_cache_for_collages(c, [e.collage]) + elif e.release: + update_cache_for_releases(c, [e.release]) + elif e.type == "deleted": + if e.collage: + update_cache_evict_nonexistent_collages(c) + elif e.release: + update_cache_evict_nonexistent_releases(c) + elif e.type == "moved": + if e.collage: + update_cache_for_collages(c, [e.collage]) + update_cache_evict_nonexistent_collages(c) + elif e.release: + update_cache_for_releases(c, [e.release]) + update_cache_evict_nonexistent_releases(c) + + +async def event_processor(c: Config, queue: Queue[WatchdogEvent]) -> None: # pragma: no cover + debounce_times: dict[str, float] = {} + while True: + await asyncio.sleep(0.01) + + try: + event = queue.get(block=False) + except Empty: + continue + + if event.collage: + logger.info( + f"Updating cache in response to {event.type} event on collage {event.collage}" + ) + await handle_event(c, event) + continue + + assert event.release is not None + # Debounce releases. Reason is documented at top of module. + key = event.type + "|" + str(event.release) + last = debounce_times.get(key, None) + if last and time.time() - last < 0.2: + logger.debug(f"Skipped event {key} due to debouncer") + continue + debounce_times[key] = time.time() + # Launch the handler with the sleep asynchronously. This allows us to not block the main + # thread, but insert a delay before processing the release. + logger.info( + f"Updating cache in response to {event.type} event on release {event.release.name}" + ) + asyncio.create_task(handle_event(c, event, 0.2)) + + +def watchdog_main(c: Config) -> None: # pragma: no cover + queue: Queue[WatchdogEvent] = Queue() observer = Observer() - event_handler = EventHandler(c) + event_handler = EventHandler(c, queue) observer.schedule(event_handler, c.music_source_dir, recursive=True) # type: ignore - return observer + observer.start() # type: ignore + asyncio.run(event_processor(c, queue)) + + +def create_watchdog_process(c: Config) -> Process: + return Process(target=watchdog_main, args=[c]) def start_watchdog(c: Config, foreground: bool = False) -> None: # pragma: no cover logger.info("Starting cache watchdog") - thread = create_watchdog_observer(c) - thread.start() # type: ignore + process = create_watchdog_process(c) + process.start() if foreground: - thread.join() + process.join() diff --git a/rose/watcher_test.py b/rose/watcher_test.py index 2d2ded6..f661756 100644 --- a/rose/watcher_test.py +++ b/rose/watcher_test.py @@ -6,18 +6,18 @@ from conftest import TEST_COLLAGE_1, TEST_RELEASE_2, TEST_RELEASE_3 from rose.cache import connect from rose.config import Config -from rose.watcher import create_watchdog_observer +from rose.watcher import create_watchdog_process @contextmanager def start_watcher(c: Config) -> Iterator[None]: - observer = create_watchdog_observer(c) + process = create_watchdog_process(c) try: - observer.start() # type: ignore - time.sleep(0.05) + process.start() + time.sleep(0.1) yield finally: - observer.stop() # type: ignore + process.terminate() def test_watchdog_events(config: Config) -> None: @@ -25,21 +25,21 @@ def test_watchdog_events(config: Config) -> None: with start_watcher(config): # Create release. shutil.copytree(TEST_RELEASE_2, src / TEST_RELEASE_2.name) - time.sleep(0.05) + time.sleep(0.4) with connect(config) as conn: cursor = conn.execute("SELECT id FROM releases") assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly"} # Create another release. shutil.copytree(TEST_RELEASE_3, src / TEST_RELEASE_3.name) - time.sleep(0.05) + time.sleep(0.4) with connect(config) as conn: cursor = conn.execute("SELECT id FROM releases") assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly", "ilovenewjeans"} # Create collage. shutil.copytree(TEST_COLLAGE_1, src / "!collages") - time.sleep(0.05) + time.sleep(0.2) with connect(config) as conn: cursor = conn.execute("SELECT name FROM collages") assert {r["name"] for r in cursor.fetchall()} == {"Rose Gold"} @@ -53,7 +53,7 @@ def test_watchdog_events(config: Config) -> None: # Delete release. shutil.rmtree(src / TEST_RELEASE_3.name) - time.sleep(0.05) + time.sleep(0.4) with connect(config) as conn: cursor = conn.execute("SELECT id FROM releases") assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly"} @@ -62,7 +62,7 @@ def test_watchdog_events(config: Config) -> None: # Rename release. (src / TEST_RELEASE_2.name).rename(src / "lalala") - time.sleep(0.05) + time.sleep(0.4) with connect(config) as conn: cursor = conn.execute("SELECT id, source_path FROM releases") rows = cursor.fetchall() @@ -73,7 +73,7 @@ def test_watchdog_events(config: Config) -> None: # Rename collage. (src / "!collages" / "Rose Gold.toml").rename(src / "!collages" / "Black Pink.toml") - time.sleep(0.05) + time.sleep(0.2) with connect(config) as conn: cursor = conn.execute("SELECT name FROM collages") assert {r["name"] for r in cursor.fetchall()} == {"Black Pink"} @@ -82,7 +82,7 @@ def test_watchdog_events(config: Config) -> None: # Delete collage. (src / "!collages" / "Black Pink.toml").unlink() - time.sleep(0.05) + time.sleep(0.2) with connect(config) as conn: cursor = conn.execute("SELECT COUNT(*) FROM collages") assert cursor.fetchone()[0] == 0