Skip to content

Commit

Permalink
optimize watcher_test and reduce flakes by introducing wait pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
azuline committed Oct 25, 2023
1 parent 079c132 commit b64e879
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ test:
pytest -n logical --cov=. .
coverage html

test-sync:
test-seq:
pytest --cov=. .
coverage html

Expand Down
11 changes: 8 additions & 3 deletions rose/watcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import contextlib
import logging
import sys
import time
from dataclasses import dataclass
from pathlib import Path
Expand All @@ -24,6 +25,10 @@

logger = logging.getLogger(__name__)

# Shorten wait times if we are in a test. This way a test runs faster. This is wasteful in
# production though.
WAIT_DIVIDER = 1 if "pytest" not in sys.modules else 10


# Changes to releases occur across an entire directory, but change events come in at a file
# granularity. We only want to operate on a release once all files have finished changing.
Expand Down Expand Up @@ -88,7 +93,7 @@ async def handle_event(
wait: float | None = None,
) -> None: # pragma: no cover
if wait:
await asyncio.sleep(wait)
await asyncio.sleep(wait / WAIT_DIVIDER)

if e.type == "created" or e.type == "modified":
if e.collage:
Expand All @@ -112,7 +117,7 @@ async def handle_event(
async def event_processor(c: Config, queue: Queue[WatchdogEvent]) -> None: # pragma: no cover
debounce_times: dict[str, float] = {}
while True:
await asyncio.sleep(0.01)
await asyncio.sleep(0.01 / WAIT_DIVIDER)

try:
event = queue.get(block=False)
Expand All @@ -139,7 +144,7 @@ async def event_processor(c: Config, queue: Queue[WatchdogEvent]) -> None: # pr
logger.info(
f"Updating cache in response to {event.type} event on release {event.release.name}"
)
asyncio.create_task(handle_event(c, event, 0.2))
asyncio.create_task(handle_event(c, event, 0.5))


def start_watchdog(c: Config) -> None: # pragma: no cover
Expand Down
116 changes: 78 additions & 38 deletions rose/watcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,57 @@ def start_watcher(c: Config) -> Iterator[None]:
process = Process(target=start_watchdog, args=[c])
try:
process.start()
time.sleep(0.1)
time.sleep(0.05)
yield
finally:
process.terminate()


def retry_for_sec(timeout_sec: float) -> Iterator[None]:
start = time.time()
while True:
yield
time.sleep(0.005)
if time.time() - start >= timeout_sec:
raise StopIteration


def test_watchdog_events(config: Config) -> None:
src = config.music_source_dir
with start_watcher(config):
# Create release.
shutil.copytree(TEST_RELEASE_2, src / TEST_RELEASE_2.name)
time.sleep(0.5)
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly"}
for _ in retry_for_sec(1):
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
if {r["id"] for r in cursor.fetchall()} == {"ilovecarly"}:
break
else:
raise AssertionError("Failed to find release ID in cache.")

# Create another release.
shutil.copytree(TEST_RELEASE_3, src / TEST_RELEASE_3.name)
time.sleep(0.5)
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly", "ilovenewjeans"}
for _ in retry_for_sec(1):
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
if {r["id"] for r in cursor.fetchall()} == {"ilovecarly", "ilovenewjeans"}:
break
else:
raise AssertionError("Failed to find second release ID in cache.")

# Create collage.
shutil.copytree(TEST_COLLAGE_1, src / "!collages")
time.sleep(0.3)
with connect(config) as conn:
cursor = conn.execute("SELECT name FROM collages")
assert {r["name"] for r in cursor.fetchall()} == {"Rose Gold"}
cursor = conn.execute("SELECT release_id FROM collages_releases")
assert {r["release_id"] for r in cursor.fetchall()} == {"ilovecarly", "ilovenewjeans"}
for _ in retry_for_sec(1):
with connect(config) as conn:
cursor = conn.execute("SELECT name FROM collages")
if {r["name"] for r in cursor.fetchall()} != {"Rose Gold"}:
continue
cursor = conn.execute("SELECT release_id FROM collages_releases")
if {r["release_id"] for r in cursor.fetchall()} != {"ilovecarly", "ilovenewjeans"}:
continue
break
else:
raise AssertionError("Failed to find collage in cache.")

# Create/rename/delete random files; check that they don't interfere with rest of the test.
(src / "hi.nfo").touch()
Expand All @@ -54,36 +74,56 @@ def test_watchdog_events(config: Config) -> None:

# Delete release.
shutil.rmtree(src / TEST_RELEASE_3.name)
time.sleep(0.5)
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
assert {r["id"] for r in cursor.fetchall()} == {"ilovecarly"}
cursor = conn.execute("SELECT release_id FROM collages_releases")
assert {r["release_id"] for r in cursor.fetchall()} == {"ilovecarly"}
for _ in retry_for_sec(1):
with connect(config) as conn:
cursor = conn.execute("SELECT id FROM releases")
if {r["id"] for r in cursor.fetchall()} != {"ilovecarly"}:
continue
cursor = conn.execute("SELECT release_id FROM collages_releases")
if {r["release_id"] for r in cursor.fetchall()} != {"ilovecarly"}:
continue
break
else:
raise AssertionError("Failed to see release deletion in cache.")

# Rename release.
(src / TEST_RELEASE_2.name).rename(src / "lalala")
time.sleep(0.5)
with connect(config) as conn:
cursor = conn.execute("SELECT id, source_path FROM releases")
rows = cursor.fetchall()
assert len(rows) == 1
row = rows[0]
assert row["id"] == "ilovecarly"
assert row["source_path"] == str(src / "lalala")
for _ in retry_for_sec(1):
with connect(config) as conn:
cursor = conn.execute("SELECT id, source_path FROM releases")
rows = cursor.fetchall()
if len(rows) != 1:
continue
row = rows[0]
if row["id"] != "ilovecarly":
continue
if row["source_path"] != str(src / "lalala"):
continue
break
else:
raise AssertionError("Failed to see release deletion in cache.")

# Rename collage.
(src / "!collages" / "Rose Gold.toml").rename(src / "!collages" / "Black Pink.toml")
time.sleep(0.5)
with connect(config) as conn:
cursor = conn.execute("SELECT name FROM collages")
assert {r["name"] for r in cursor.fetchall()} == {"Black Pink"}
cursor = conn.execute("SELECT release_id FROM collages_releases")
assert {r["release_id"] for r in cursor.fetchall()} == {"ilovecarly"}
for _ in retry_for_sec(1):
with connect(config) as conn:
cursor = conn.execute("SELECT name FROM collages")
if {r["name"] for r in cursor.fetchall()} != {"Black Pink"}:
continue
cursor = conn.execute("SELECT release_id FROM collages_releases")
if {r["release_id"] for r in cursor.fetchall()} != {"ilovecarly"}:
continue
break
else:
raise AssertionError("Failed to see collage rename in cache.")

# Delete collage.
(src / "!collages" / "Black Pink.toml").unlink()
time.sleep(0.5)
with connect(config) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM collages")
assert cursor.fetchone()[0] == 0
for _ in retry_for_sec(1):
with connect(config) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM collages")
if cursor.fetchone()[0] == 0:
break
else:
raise AssertionError("Failed to see collage rename in cache.")

0 comments on commit b64e879

Please sign in to comment.