Skip to content

Commit

Permalink
fix race condition in watchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
azuline committed Oct 18, 2023
1 parent 5d8a29e commit 518632d
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 89 deletions.
204 changes: 127 additions & 77 deletions rose/watcher.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import asyncio
import contextlib
import logging
import time
from dataclasses import dataclass
from multiprocessing import Process
from pathlib import Path
from queue import Empty, Queue
from typing import Literal

from watchdog.events import (
DirCreatedEvent,
DirDeletedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileSystemEvent,
FileSystemEventHandler,
FileSystemMovedEvent,
)
from watchdog.observers import Observer
from watchdog.observers.api import BaseObserver

from rose.cache import (
update_cache_evict_nonexistent_collages,
Expand All @@ -25,90 +26,139 @@
logger = logging.getLogger(__name__)


@dataclass
class AffectedEntity:
release: Path | None = None
# Changes to releases occur across an entire directory, but change events come in at a file
# granularity. We only want to operate on a release once all files have finished changing.
# Otherwise, we may observe effects like a `.rose.{uuid}.toml` file being created for a "new"
# release, and afterwards an existing `.rose.{uuid}.toml` file gets copied in by the trailing
# filesystem operation.
#
# Therefore, we architect the watcher like so:
#
# Process
# Thread -> watchdog/inotify listener that enqueues events
# Event Loop -> processes+debounces events asynchronously from the queue


EventType = Literal["created", "deleted", "modified", "moved"]
EVENT_TYPES: list[EventType] = ["created", "deleted", "modified", "moved"]


@dataclass(frozen=True)
class WatchdogEvent:
type: EventType
collage: str | None = None
release: Path | None = None


def parse_affected_entity(config: Config, path: str) -> AffectedEntity | None:
relative_path = path.removeprefix(str(config.music_source_dir) + "/")
if relative_path.startswith("!collages/"):
if not relative_path.endswith(".toml"):
return None
collage = relative_path.removeprefix("!collages/").removesuffix(".toml")
logger.debug(f"Parsed change event on collage {collage}")
return AffectedEntity(collage=collage)
try:
release_dir = config.music_source_dir / Path(relative_path).parts[0]
logger.debug(f"Parsed event on release {release_dir}")
return AffectedEntity(release=release_dir)
except IndexError:
return None


class EventHandler(FileSystemEventHandler):
def __init__(self, config: Config):
class EventHandler(FileSystemEventHandler): # pragma: no cover
def __init__(self, config: Config, queue: Queue[WatchdogEvent]):
super().__init__()
self.config = config
self.queue = queue

def on_created(self, event: FileCreatedEvent | DirCreatedEvent) -> None:
super().on_created(event) # type: ignore
logger.debug(f"Notified of change event for {event.src_path}")
affected = parse_affected_entity(self.config, event.src_path)
if not affected:
return
if affected.collage:
update_cache_for_collages(self.config, [affected.collage])
elif affected.release:
update_cache_for_releases(self.config, [affected.release])

def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None:
super().on_deleted(event) # type: ignore
logger.debug(f"Notified of change event for {event.src_path}")
affected = parse_affected_entity(self.config, event.src_path)
if not affected:
return
if affected.collage:
update_cache_evict_nonexistent_collages(self.config)
elif affected.release:
update_cache_evict_nonexistent_releases(self.config)

def on_modified(self, event: FileModifiedEvent) -> None:
super().on_modified(event) # type: ignore
logger.debug(f"Notified of change event for {event.src_path}")
affected = parse_affected_entity(self.config, event.src_path)
if not affected:
return
if affected.collage:
update_cache_for_collages(self.config, [affected.collage])
elif affected.release:
update_cache_for_releases(self.config, [affected.release])

def on_moved(self, event: FileSystemMovedEvent) -> None:
super().on_moved(event) # type: ignore
logger.debug(f"Notified of change event for {event.src_path}")
affected = parse_affected_entity(self.config, event.dest_path)
if not affected:
def on_any_event(self, event: FileSystemEvent) -> None:
super().on_any_event(event) # type: ignore
path = event.dest_path if isinstance(event, FileSystemMovedEvent) else event.src_path
logger.debug(f"Notified of {event.event_type} event for {path}")

etype: EventType = event.event_type # type: ignore
if etype not in EVENT_TYPES:
return
if affected.collage:
update_cache_for_collages(self.config, [affected.collage])
update_cache_evict_nonexistent_collages(self.config)
elif affected.release:
update_cache_for_releases(self.config, [affected.release])
update_cache_evict_nonexistent_releases(self.config)

relative_path = path.removeprefix(str(self.config.music_source_dir) + "/")
if relative_path.startswith("!collages/"):
if not relative_path.endswith(".toml"):
return
collage = relative_path.removeprefix("!collages/").removesuffix(".toml")
logger.debug(f"Queueing {etype} event on collage {collage}")
self.queue.put(WatchdogEvent(collage=collage, type=etype))
return

def create_watchdog_observer(c: Config) -> BaseObserver:
with contextlib.suppress(IndexError):
final_path_part = Path(relative_path).parts[0]
if final_path_part == "/":
return
release_dir = self.config.music_source_dir / final_path_part
logger.debug(f"Queueing {etype} event on release {release_dir}")
self.queue.put(WatchdogEvent(release=release_dir, type=etype))


async def handle_event(
c: Config,
e: WatchdogEvent,
wait: float | None = None,
) -> None: # pragma: no cover
if wait:
await asyncio.sleep(wait)

if e.type == "created" or e.type == "modified":
if e.collage:
update_cache_for_collages(c, [e.collage])
elif e.release:
update_cache_for_releases(c, [e.release])
elif e.type == "deleted":
if e.collage:
update_cache_evict_nonexistent_collages(c)
elif e.release:
update_cache_evict_nonexistent_releases(c)
elif e.type == "moved":
if e.collage:
update_cache_for_collages(c, [e.collage])
update_cache_evict_nonexistent_collages(c)
elif e.release:
update_cache_for_releases(c, [e.release])
update_cache_evict_nonexistent_releases(c)


async def event_processor(c: Config, queue: Queue[WatchdogEvent]) -> None: # pragma: no cover
debounce_times: dict[str, float] = {}
while True:
await asyncio.sleep(0.01)

try:
event = queue.get(block=False)
except Empty:
continue

if event.collage:
logger.info(
f"Updating cache in response to {event.type} event on collage {event.collage}"
)
await handle_event(c, event)
continue

assert event.release is not None
# Debounce releases. Reason is documented at top of module.
key = event.type + "|" + str(event.release)
last = debounce_times.get(key, None)
if last and time.time() - last < 0.2:
logger.debug(f"Skipped event {key} due to debouncer")
continue
debounce_times[key] = time.time()
# Launch the handler with the sleep asynchronously. This allows us to not block the main
# thread, but insert a delay before processing the release.
logger.info(
f"Updating cache in response to {event.type} event on release {event.release.name}"
)
asyncio.create_task(handle_event(c, event, 0.2))


def watchdog_main(c: Config) -> None: # pragma: no cover
queue: Queue[WatchdogEvent] = Queue()
observer = Observer()
event_handler = EventHandler(c)
event_handler = EventHandler(c, queue)
observer.schedule(event_handler, c.music_source_dir, recursive=True) # type: ignore
return observer
observer.start() # type: ignore
asyncio.run(event_processor(c, queue))


def create_watchdog_process(c: Config) -> Process:
return Process(target=watchdog_main, args=[c])


def start_watchdog(c: Config, foreground: bool = False) -> None: # pragma: no cover
logger.info("Starting cache watchdog")
thread = create_watchdog_observer(c)
thread.start() # type: ignore
process = create_watchdog_process(c)
process.start()
if foreground:
thread.join()
process.join()
24 changes: 12 additions & 12 deletions rose/watcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,40 @@
from conftest import TEST_COLLAGE_1, TEST_RELEASE_2, TEST_RELEASE_3
from rose.cache import connect
from rose.config import Config
from rose.watcher import create_watchdog_observer
from rose.watcher import create_watchdog_process


@contextmanager
def start_watcher(c: Config) -> Iterator[None]:
observer = create_watchdog_observer(c)
process = create_watchdog_process(c)
try:
observer.start() # type: ignore
time.sleep(0.05)
process.start()
time.sleep(0.1)
yield
finally:
observer.stop() # type: ignore
process.terminate()


def test_watchdog_events(config: Config) -> None:
src = config.music_source_dir
with start_watcher(config):
# Create release.
shutil.copytree(TEST_RELEASE_2, src / TEST_RELEASE_2.name)
time.sleep(0.05)
time.sleep(0.4)
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly"}

# Create another release.
shutil.copytree(TEST_RELEASE_3, src / TEST_RELEASE_3.name)
time.sleep(0.05)
time.sleep(0.4)
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly", "ilovenewjeans"}

# Create collage.
shutil.copytree(TEST_COLLAGE_1, src / "!collages")
time.sleep(0.05)
time.sleep(0.2)
with connect(config) as conn:
cursor = conn.execute("SELECT name FROM collages")
assert {r["name"] for r in cursor.fetchall()} == {"Rose Gold"}
Expand All @@ -53,7 +53,7 @@ def test_watchdog_events(config: Config) -> None:

# Delete release.
shutil.rmtree(src / TEST_RELEASE_3.name)
time.sleep(0.05)
time.sleep(0.4)
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly"}
Expand All @@ -62,7 +62,7 @@ def test_watchdog_events(config: Config) -> None:

# Rename release.
(src / TEST_RELEASE_2.name).rename(src / "lalala")
time.sleep(0.05)
time.sleep(0.4)
with connect(config) as conn:
cursor = conn.execute("SELECT id, source_path FROM releases")
rows = cursor.fetchall()
Expand All @@ -73,7 +73,7 @@ def test_watchdog_events(config: Config) -> None:

# Rename collage.
(src / "!collages" / "Rose Gold.toml").rename(src / "!collages" / "Black Pink.toml")
time.sleep(0.05)
time.sleep(0.2)
with connect(config) as conn:
cursor = conn.execute("SELECT name FROM collages")
assert {r["name"] for r in cursor.fetchall()} == {"Black Pink"}
Expand All @@ -82,7 +82,7 @@ def test_watchdog_events(config: Config) -> None:

# Delete collage.
(src / "!collages" / "Black Pink.toml").unlink()
time.sleep(0.05)
time.sleep(0.2)
with connect(config) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM collages")
assert cursor.fetchone()[0] == 0

0 comments on commit 518632d

Please sign in to comment.