diff --git a/README.md b/README.md index 531cf75..80e58d2 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ _Work in Progress. See [Issue #1](https://github.com/azuline/rose/issues/1) for the current state._ +**WARNING: Rosé modifies your audio files. If you do not want to modify your +audio files, you should not use Rosé.** + A virtual filesystem for music and metadata improvement tooling. ## The Virtual Filesystem diff --git a/rose/__main__.py b/rose/__main__.py index 528c558..d2168bb 100644 --- a/rose/__main__.py +++ b/rose/__main__.py @@ -13,7 +13,7 @@ edit_collage_in_editor, ) from rose.config import Config -from rose.releases import dump_releases, toggle_release_new +from rose.releases import dump_releases, edit_release, toggle_release_new from rose.virtualfs import mount_virtualfs, unmount_virtualfs from rose.watcher import start_watchdog @@ -102,6 +102,14 @@ def print1(ctx: Context) -> None: print(dump_releases(ctx.config)) +@releases.command(name="edit") +@click.argument("release", type=str, nargs=1) +@click.pass_obj +def edit2(ctx: Context, release: str) -> None: + """Edit a release's metadata in $EDITOR.""" + edit_release(ctx.config, release) + + @releases.command() @click.argument("release", type=str, nargs=1) @click.pass_obj diff --git a/rose/artiststr.py b/rose/artiststr.py index 88e1d32..e8f2ae4 100644 --- a/rose/artiststr.py +++ b/rose/artiststr.py @@ -9,7 +9,7 @@ @dataclass -class Artists: +class ArtistMapping: main: list[str] = field(default_factory=list) guest: list[str] = field(default_factory=list) remixer: list[str] = field(default_factory=list) @@ -26,7 +26,7 @@ def parse_artist_string( conductor: str | None = None, producer: str | None = None, dj: str | None = None, -) -> Artists: +) -> ArtistMapping: def _split_tag(t: str | None) -> list[str]: return TAG_SPLITTER_REGEX.split(t) if t else [] @@ -54,7 +54,7 @@ def _split_tag(t: str | None) -> list[str]: if main: li_main.extend(_split_tag(main)) - rval = Artists( + rval = ArtistMapping( main=_deduplicate(li_main), guest=_deduplicate(li_guests), remixer=_deduplicate(li_remixer), @@ -68,7 +68,7 @@ def _split_tag(t: str | None) -> list[str]: return rval -def format_artist_string(a: Artists, genres: list[str]) -> str: +def format_artist_string(a: ArtistMapping, genres: list[str]) -> str: r = ";".join(a.main) if a.composer and "Classical" in genres: r = ";".join(a.composer) + " performed by " + r diff --git a/rose/artiststr_test.py b/rose/artiststr_test.py index 5854136..0bb1f01 100644 --- a/rose/artiststr_test.py +++ b/rose/artiststr_test.py @@ -1,40 +1,43 @@ -from rose.artiststr import Artists, format_artist_string, parse_artist_string +from rose.artiststr import ArtistMapping, format_artist_string, parse_artist_string def test_parse_artist_string() -> None: - assert parse_artist_string("A;B feat. C;D") == Artists( + assert parse_artist_string("A;B feat. C;D") == ArtistMapping( main=["A", "B"], guest=["C", "D"], ) - assert parse_artist_string("A pres. C;D") == Artists( + assert parse_artist_string("A pres. C;D") == ArtistMapping( djmixer=["A"], main=["C", "D"], ) - assert parse_artist_string("A performed by C;D") == Artists( + assert parse_artist_string("A performed by C;D") == ArtistMapping( composer=["A"], main=["C", "D"], ) - assert parse_artist_string("A pres. B;C feat. D;E") == Artists( + assert parse_artist_string("A pres. B;C feat. D;E") == ArtistMapping( djmixer=["A"], main=["B", "C"], guest=["D", "E"], ) # Test the deduplication handling. - assert parse_artist_string("A pres. B", dj="A") == Artists( + assert parse_artist_string("A pres. B", dj="A") == ArtistMapping( djmixer=["A"], main=["B"], ) def test_format_artist_string() -> None: - assert format_artist_string(Artists(main=["A", "B"], guest=["C", "D"]), []) == "A;B feat. C;D" - assert format_artist_string(Artists(djmixer=["A"], main=["C", "D"]), []) == "A pres. C;D" - assert format_artist_string(Artists(composer=["A"], main=["C", "D"]), []) == "C;D" assert ( - format_artist_string(Artists(composer=["A"], main=["C", "D"]), ["Classical"]) + format_artist_string(ArtistMapping(main=["A", "B"], guest=["C", "D"]), []) + == "A;B feat. C;D" + ) + assert format_artist_string(ArtistMapping(djmixer=["A"], main=["C", "D"]), []) == "A pres. C;D" + assert format_artist_string(ArtistMapping(composer=["A"], main=["C", "D"]), []) == "C;D" + assert ( + format_artist_string(ArtistMapping(composer=["A"], main=["C", "D"]), ["Classical"]) == "A performed by C;D" ) assert ( - format_artist_string(Artists(djmixer=["A"], main=["B", "C"], guest=["D", "E"]), []) + format_artist_string(ArtistMapping(djmixer=["A"], main=["B", "C"], guest=["D", "E"]), []) == "A pres. B;C feat. D;E" ) diff --git a/rose/cache.py b/rose/cache.py index c8976c0..1e53561 100644 --- a/rose/cache.py +++ b/rose/cache.py @@ -113,7 +113,7 @@ class CachedRelease: datafile_mtime: str virtual_dirname: str title: str - type: str + releasetype: str year: int | None new: bool multidisc: bool @@ -389,7 +389,7 @@ def _update_cache_for_releases_executor( datafile_mtime=row["datafile_mtime"], virtual_dirname=row["virtual_dirname"], title=row["title"], - type=row["release_type"], + releasetype=row["release_type"], year=row["release_year"], multidisc=bool(row["multidisc"]), new=bool(row["new"]), @@ -509,7 +509,7 @@ def _update_cache_for_releases_executor( added_at="", virtual_dirname="", title="", - type="", + releasetype="", year=None, new=True, multidisc=False, @@ -635,9 +635,9 @@ def _update_cache_for_releases_executor( if tags.release_type and tags.release_type.lower() in SUPPORTED_RELEASE_TYPES else "unknown" ) - if release_type != release.type: + if release_type != release.releasetype: logger.debug(f"Release type change detected for {source_path}, updating") - release.type = release_type + release.releasetype = release_type release_dirty = True if tags.year != release.year: @@ -680,8 +680,8 @@ def _update_cache_for_releases_executor( if release.year: release_virtual_dirname += str(release.year) + ". " release_virtual_dirname += release.title - if release.type not in ["album", "unknown"]: - release_virtual_dirname += " - " + release.type.title() + if release.releasetype not in ["album", "unknown"]: + release_virtual_dirname += " - " + release.releasetype.title() if release.genres: release_virtual_dirname += " [" + ";".join(release.genres) + "]" if release.labels: @@ -716,9 +716,22 @@ def _update_cache_for_releases_executor( release.virtual_dirname = release_virtual_dirname release_dirty = True + # Here we compute the track ID. We store the track ID on the audio file in order to + # enable persistence. This does mutate the file! + # + # We don't attempt to optimize this write; however, there is not much purpose to doing + # so, since this occurs once over the lifetime of the track's existence in Rose. We + # optimize this function because it is called repeatedly upon every metadata edit, but + # in this case, we skip this code path once an ID is generated. + track_id = tags.id + if not track_id: + track_id = str(uuid6.uuid7()) + tags.id = track_id + tags.flush() + # And now create the cached track. track = CachedTrack( - id=str(uuid6.uuid7()), + id=track_id, source_path=track_path, source_mtime=track_mtime, virtual_filename="", @@ -755,7 +768,7 @@ def _update_cache_for_releases_executor( if t.track_number: virtual_filename += f"{t.track_number:0>2}. " virtual_filename += t.title or "Unknown Title" - if release.type in ["compilation", "soundtrack", "remix", "djmix", "mixtape"]: + if release.releasetype in ["compilation", "soundtrack", "remix", "djmix", "mixtape"]: virtual_filename += f" (by {t.formatted_artists})" virtual_filename += t.source_path.suffix virtual_filename = _sanitize_filename(virtual_filename) @@ -795,7 +808,7 @@ def _update_cache_for_releases_executor( release.datafile_mtime, release.virtual_dirname, release.title, - release.type, + release.releasetype, release.year, release.multidisc, release.new, @@ -1229,7 +1242,7 @@ def list_releases( datafile_mtime=row["datafile_mtime"], virtual_dirname=row["virtual_dirname"], title=row["title"], - type=row["release_type"], + releasetype=row["release_type"], year=row["release_year"], multidisc=bool(row["multidisc"]), new=bool(row["new"]), @@ -1240,16 +1253,87 @@ def list_releases( ) -@dataclass -class ReleaseFiles: - tracks: list[CachedTrack] - cover: Path | None - - -def get_release_files(c: Config, release_virtual_dirname: str) -> ReleaseFiles: - rf = ReleaseFiles(tracks=[], cover=None) - +def get_release( + c: Config, + release_id_or_virtual_dirname: str, +) -> tuple[CachedRelease, list[CachedTrack]] | None: with connect(c) as conn: + cursor = conn.execute( + r""" + WITH genres AS ( + SELECT + release_id + , GROUP_CONCAT(genre, ' \\ ') AS genres + FROM (SELECT * FROM releases_genres ORDER BY genre) + GROUP BY release_id + ), labels AS ( + SELECT + release_id + , GROUP_CONCAT(label, ' \\ ') AS labels + FROM (SELECT * FROM releases_labels ORDER BY label) + GROUP BY release_id + ), artists AS ( + SELECT + release_id + , GROUP_CONCAT(artist, ' \\ ') AS names + , GROUP_CONCAT(role, ' \\ ') AS roles + FROM (SELECT * FROM releases_artists ORDER BY artist, role) + GROUP BY release_id + ) + SELECT + r.id + , r.source_path + , r.cover_image_path + , r.added_at + , r.datafile_mtime + , r.virtual_dirname + , r.title + , r.release_type + , r.release_year + , r.multidisc + , r.new + , r.formatted_artists + , COALESCE(g.genres, '') AS genres + , COALESCE(l.labels, '') AS labels + , COALESCE(a.names, '') AS artist_names + , COALESCE(a.roles, '') AS artist_roles + FROM releases r + LEFT JOIN genres g ON g.release_id = r.id + LEFT JOIN labels l ON l.release_id = r.id + LEFT JOIN artists a ON a.release_id = r.id + WHERE r.id = ? or r.virtual_dirname = ? + """, + (release_id_or_virtual_dirname, release_id_or_virtual_dirname), + ) + row = cursor.fetchone() + if not row: + return None + rartists: list[CachedArtist] = [] + for n, r in zip(row["artist_names"].split(r" \\ "), row["artist_roles"].split(r" \\ ")): + if not n: + # This can occur if there are no artist names; then we get a single iteration + # with empty string. + continue + rartists.append(CachedArtist(name=n, role=r)) + release = CachedRelease( + id=row["id"], + source_path=Path(row["source_path"]), + cover_image_path=Path(row["cover_image_path"]) if row["cover_image_path"] else None, + added_at=row["added_at"], + datafile_mtime=row["datafile_mtime"], + virtual_dirname=row["virtual_dirname"], + title=row["title"], + releasetype=row["release_type"], + year=row["release_year"], + multidisc=bool(row["multidisc"]), + new=bool(row["new"]), + genres=row["genres"].split(r" \\ ") if row["genres"] else [], + labels=row["labels"].split(r" \\ ") if row["labels"] else [], + artists=rartists, + formatted_artists=row["formatted_artists"], + ) + + tracks: list[CachedTrack] = [] cursor = conn.execute( r""" WITH artists AS ( @@ -1276,19 +1360,20 @@ def get_release_files(c: Config, release_virtual_dirname: str) -> ReleaseFiles: FROM tracks t JOIN releases r ON r.id = t.release_id LEFT JOIN artists a ON a.track_id = t.id - WHERE r.virtual_dirname = ? + WHERE r.id = ? OR r.virtual_dirname = ? + ORDER BY t.disc_number, t.track_number """, - (release_virtual_dirname,), + (release_id_or_virtual_dirname, release_id_or_virtual_dirname), ) for row in cursor: - artists: list[CachedArtist] = [] + tartists: list[CachedArtist] = [] for n, r in zip(row["artist_names"].split(r" \\ "), row["artist_roles"].split(r" \\ ")): if not n: # This can occur if there are no artist names; then we get a single iteration # with empty string. continue - artists.append(CachedArtist(name=n, role=r)) - rf.tracks.append( + tartists.append(CachedArtist(name=n, role=r)) + tracks.append( CachedTrack( id=row["id"], source_path=Path(row["source_path"]), @@ -1300,18 +1385,11 @@ def get_release_files(c: Config, release_virtual_dirname: str) -> ReleaseFiles: disc_number=row["disc_number"], duration_seconds=row["duration_seconds"], formatted_artists=row["formatted_artists"], - artists=artists, + artists=tartists, ) ) - cursor = conn.execute( - "SELECT cover_image_path FROM releases WHERE virtual_dirname = ?", - (release_virtual_dirname,), - ) - if (row := cursor.fetchone()) and row["cover_image_path"]: - rf.cover = Path(row["cover_image_path"]) - - return rf + return (release, tracks) def get_release_id_from_virtual_dirname(c: Config, release_virtual_dirname: str) -> str | None: diff --git a/rose/cache_test.py b/rose/cache_test.py index 6efdd4e..2d7cd15 100644 --- a/rose/cache_test.py +++ b/rose/cache_test.py @@ -13,13 +13,12 @@ CachedArtist, CachedRelease, CachedTrack, - ReleaseFiles, artist_exists, collage_exists, connect, cover_exists, genre_exists, - get_release_files, + get_release, get_release_id_from_virtual_dirname, get_release_source_path_from_id, get_release_virtual_dirname_from_id, @@ -219,6 +218,29 @@ def test_update_cache_releases_uncached_with_existing_id(config: Config) -> None assert release_id == "ilovecarly" # Hardcoded ID for testing. +def test_update_cache_releases_preserves_track_ids_across_rebuilds(config: Config) -> None: + """Test that track IDs are preserved across cache rebuilds.""" + release_dir = config.music_source_dir / TEST_RELEASE_2.name + shutil.copytree(TEST_RELEASE_2, release_dir) + update_cache_for_releases(config, [release_dir]) + with connect(config) as conn: + cursor = conn.execute("SELECT id FROM tracks") + first_track_ids = {r["id"] for r in cursor} + + # Nuke the database. + config.cache_database_path.unlink() + migrate_database(config) + + # Repeat cache population. + update_cache_for_releases(config, [release_dir]) + with connect(config) as conn: + cursor = conn.execute("SELECT id FROM tracks") + second_track_ids = {r["id"] for r in cursor} + + # Assert IDs are equivalent. + assert first_track_ids == second_track_ids + + def test_update_cache_releases_already_fully_cached(config: Config) -> None: """Test that a fully cached release No Ops when updated again.""" release_dir = config.music_source_dir / TEST_RELEASE_1.name @@ -511,14 +533,14 @@ def test_list_releases(config: Config) -> None: releases = list(list_releases(config)) assert releases == [ CachedRelease( - datafile_mtime=releases[0].datafile_mtime, # IGNORE THIS FIELD. + datafile_mtime="999", id="r1", source_path=Path(config.music_source_dir / "r1"), cover_image_path=None, added_at="0000-01-01T00:00:00+00:00", virtual_dirname="r1", title="Release 1", - type="album", + releasetype="album", year=2023, multidisc=False, new=False, @@ -531,14 +553,14 @@ def test_list_releases(config: Config) -> None: formatted_artists="Techno Man;Bass Man", ), CachedRelease( - datafile_mtime=releases[1].datafile_mtime, # IGNORE THIS FIELD. + datafile_mtime="999", id="r2", source_path=Path(config.music_source_dir / "r2"), cover_image_path=Path(config.music_source_dir / "r2" / "cover.jpg"), added_at="0000-01-01T00:00:00+00:00", virtual_dirname="r2", title="Release 2", - type="album", + releasetype="album", year=2021, multidisc=False, new=False, @@ -551,14 +573,14 @@ def test_list_releases(config: Config) -> None: formatted_artists="Violin Woman feat. Conductor Woman", ), CachedRelease( - datafile_mtime=releases[2].datafile_mtime, # IGNORE THIS FIELD. + datafile_mtime="999", id="r3", source_path=Path(config.music_source_dir / "r3"), cover_image_path=None, added_at="0000-01-01T00:00:00+00:00", virtual_dirname="{NEW} r3", title="Release 3", - type="album", + releasetype="album", year=2021, multidisc=False, new=True, @@ -572,14 +594,14 @@ def test_list_releases(config: Config) -> None: releases = list(list_releases(config, sanitized_artist_filter="Techno Man")) assert releases == [ CachedRelease( - datafile_mtime=releases[0].datafile_mtime, # IGNORE THIS FIELD. + datafile_mtime="999", id="r1", source_path=Path(config.music_source_dir / "r1"), cover_image_path=None, added_at="0000-01-01T00:00:00+00:00", virtual_dirname="r1", title="Release 1", - type="album", + releasetype="album", year=2023, multidisc=False, new=False, @@ -596,14 +618,14 @@ def test_list_releases(config: Config) -> None: releases = list(list_releases(config, sanitized_genre_filter="Techno")) assert releases == [ CachedRelease( - datafile_mtime=releases[0].datafile_mtime, # IGNORE THIS FIELD. + datafile_mtime="999", id="r1", source_path=Path(config.music_source_dir / "r1"), cover_image_path=None, added_at="0000-01-01T00:00:00+00:00", virtual_dirname="r1", title="Release 1", - type="album", + releasetype="album", year=2023, multidisc=False, new=False, @@ -620,14 +642,14 @@ def test_list_releases(config: Config) -> None: releases = list(list_releases(config, sanitized_label_filter="Silk Music")) assert releases == [ CachedRelease( - datafile_mtime=releases[0].datafile_mtime, # IGNORE THIS FIELD. + datafile_mtime="999", id="r1", source_path=Path(config.music_source_dir / "r1"), cover_image_path=None, added_at="0000-01-01T00:00:00+00:00", virtual_dirname="r1", title="Release 1", - type="album", + releasetype="album", year=2023, multidisc=False, new=False, @@ -643,9 +665,29 @@ def test_list_releases(config: Config) -> None: @pytest.mark.usefixtures("seeded_cache") -def test_get_release_files(config: Config) -> None: - assert get_release_files(config, "r1") == ReleaseFiles( - tracks=[ +def test_get_release(config: Config) -> None: + assert get_release(config, "r1") == ( + CachedRelease( + datafile_mtime="999", + id="r1", + source_path=Path(config.music_source_dir / "r1"), + cover_image_path=None, + added_at="0000-01-01T00:00:00+00:00", + virtual_dirname="r1", + title="Release 1", + releasetype="album", + year=2023, + multidisc=False, + new=False, + genres=["Deep House", "Techno"], + labels=["Silk Music"], + artists=[ + CachedArtist(name="Bass Man", role="main"), + CachedArtist(name="Techno Man", role="main"), + ], + formatted_artists="Techno Man;Bass Man", + ), + [ CachedTrack( id="t1", source_path=config.music_source_dir / "r1" / "01.m4a", @@ -679,7 +721,6 @@ def test_get_release_files(config: Config) -> None: formatted_artists="Techno Man;Bass Man", ), ], - cover=None, ) diff --git a/rose/collages.py b/rose/collages.py index f06b574..3ba28b0 100644 --- a/rose/collages.py +++ b/rose/collages.py @@ -25,21 +25,39 @@ class DescriptionMismatchError(RoseError): pass -def create_collage(c: Config, collage_name: str) -> None: - (c.music_source_dir / "!collages").mkdir(parents=True, exist_ok=True) - collage_path(c, collage_name).touch() - update_cache_for_collages(c, [collage_name], force=True) +class CollageDoesNotExistError(RoseError): + pass -def delete_collage(c: Config, collage_name: str) -> None: - send2trash(collage_path(c, collage_name)) +class CollageAlreadyExistsError(RoseError): + pass + + +def create_collage(c: Config, name: str) -> None: + (c.music_source_dir / "!collages").mkdir(parents=True, exist_ok=True) + path = collage_path(c, name) + if path.exists(): + raise CollageAlreadyExistsError(f"Collage {name} already exists") + path.touch() + update_cache_for_collages(c, [name], force=True) + + +def delete_collage(c: Config, name: str) -> None: + path = collage_path(c, name) + if not path.exists(): + raise CollageDoesNotExistError(f"Collage {name} does not exist") + send2trash(path) update_cache_evict_nonexistent_collages(c) def rename_collage(c: Config, old_name: str, new_name: str) -> None: logger.info(f"Renaming collage {old_name} to {new_name}") old_path = collage_path(c, old_name) + if not old_path.exists(): + raise CollageDoesNotExistError(f"Collage {old_name} does not exist") new_path = collage_path(c, new_name) + if new_path.exists(): + raise CollageAlreadyExistsError(f"Collage {new_name} already exists") old_path.rename(new_path) update_cache_for_collages(c, [new_name], force=True) update_cache_evict_nonexistent_collages(c) @@ -51,12 +69,14 @@ def delete_release_from_collage( release_id_or_virtual_dirname: str, ) -> None: release_id, release_dirname = resolve_release_ids(c, release_id_or_virtual_dirname) - fpath = collage_path(c, collage_name) - with fpath.open("rb") as fp: + path = collage_path(c, collage_name) + if not path.exists(): + raise CollageDoesNotExistError(f"Collage {collage_name} does not exist") + with path.open("rb") as fp: data = tomllib.load(fp) data["releases"] = data.get("releases", []) data["releases"] = [r for r in data.get("releases", []) if r["uuid"] != release_id] - with fpath.open("wb") as fp: + with path.open("wb") as fp: tomli_w.dump(data, fp) logger.info(f"Removed release {release_dirname} from collage {collage_name}") update_cache_for_collages(c, [collage_name], force=True) @@ -68,8 +88,10 @@ def add_release_to_collage( release_id_or_virtual_dirname: str, ) -> None: release_id, release_dirname = resolve_release_ids(c, release_id_or_virtual_dirname) - fpath = collage_path(c, collage_name) - with fpath.open("rb") as fp: + path = collage_path(c, collage_name) + if not path.exists(): + raise CollageDoesNotExistError(f"Collage {collage_name} does not exist") + with path.open("rb") as fp: data = tomllib.load(fp) data["releases"] = data.get("releases", []) # Check to see if release is already in the collage. If so, no op. We don't support duplicate @@ -79,7 +101,7 @@ def add_release_to_collage( logger.debug(f"No-Opping: Release {release_dirname} already in collage {collage_name}") return data["releases"].append({"uuid": release_id, "description_meta": release_dirname}) - with fpath.open("wb") as fp: + with path.open("wb") as fp: tomli_w.dump(data, fp) logger.info(f"Added release {release_dirname} to collage {collage_name}") update_cache_for_collages(c, [collage_name], force=True) @@ -96,15 +118,17 @@ def dump_collages(c: Config) -> str: def edit_collage_in_editor(c: Config, collage_name: str) -> None: - fpath = collage_path(c, collage_name) - with fpath.open("rb") as fp: + path = collage_path(c, collage_name) + if not path.exists(): + raise CollageDoesNotExistError(f"Collage {collage_name} does not exist") + with path.open("rb") as fp: data = tomllib.load(fp) raw_releases = data.get("releases", []) edited_release_descriptions = click.edit( "\n".join([r["description_meta"] for r in raw_releases]) ) if edited_release_descriptions is None: - logger.debug("Output of EDITOR is None; no-opping") + logger.info("Aborting: metadata file not submitted.") return uuid_mapping = {r["description_meta"]: r["uuid"] for r in raw_releases} @@ -120,7 +144,7 @@ def edit_collage_in_editor(c: Config, collage_name: str) -> None: edited_releases.append({"uuid": uuid, "description_meta": desc}) data["releases"] = edited_releases - with fpath.open("wb") as fp: + with path.open("wb") as fp: tomli_w.dump(data, fp) logger.info(f"Edited collage {collage_name} from EDITOR") update_cache_for_collages(c, [collage_name], force=True) diff --git a/rose/releases.py b/rose/releases.py index ad0147c..8eb8555 100644 --- a/rose/releases.py +++ b/rose/releases.py @@ -1,15 +1,22 @@ +from __future__ import annotations + import json import logging -from dataclasses import asdict +from dataclasses import asdict, dataclass from pathlib import Path from typing import Any +import click import tomli_w import tomllib from send2trash import send2trash +from rose.artiststr import ArtistMapping from rose.cache import ( STORED_DATA_FILE_REGEX, + CachedRelease, + CachedTrack, + get_release, get_release_id_from_virtual_dirname, get_release_source_path_from_id, get_release_virtual_dirname_from_id, @@ -20,6 +27,7 @@ ) from rose.common import RoseError, valid_uuid from rose.config import Config +from rose.tagger import AudioFile logger = logging.getLogger() @@ -28,6 +36,10 @@ class ReleaseDoesNotExistError(RoseError): pass +class UnknownArtistRoleError(RoseError): + pass + + class CustomJSONEncoder(json.JSONEncoder): def default(self, obj: Any) -> Any: if isinstance(obj, Path): @@ -75,6 +87,165 @@ def toggle_release_new(c: Config, release_id_or_virtual_dirname: str) -> None: logger.critical(f"Failed to find .rose.toml in {source_path}") +@dataclass +class MetadataArtist: + name: str + role: str + + @staticmethod + def to_mapping(artists: list[MetadataArtist]) -> ArtistMapping: + m = ArtistMapping() + for a in artists: + try: + getattr(m, a.role).append(a.name) + except AttributeError as e: + raise UnknownArtistRoleError( + f"Failed to write tags: Unknown role for artist {a.name}: {a.role}" + ) from e + return m + + +@dataclass +class MetadataTrack: + disc_number: str + track_number: str + title: str + artists: list[MetadataArtist] + + +@dataclass +class MetadataRelease: + title: str + releasetype: str + year: int | None + genres: list[str] + labels: list[str] + artists: list[MetadataArtist] + tracks: dict[str, MetadataTrack] + + @classmethod + def from_cache(cls, release: CachedRelease, tracks: list[CachedTrack]) -> MetadataRelease: + return MetadataRelease( + title=release.title, + releasetype=release.releasetype, + year=release.year, + genres=release.genres, + labels=release.labels, + artists=[ + MetadataArtist(name=a.name, role=a.role) for a in release.artists if not a.alias + ], + tracks={ + t.id: MetadataTrack( + disc_number=t.disc_number, + track_number=t.track_number, + title=t.title, + artists=[ + MetadataArtist(name=a.name, role=a.role) for a in t.artists if not a.alias + ], + ) + for t in tracks + }, + ) + + def serialize(self) -> str: + return tomli_w.dumps(asdict(self)) + + @classmethod + def from_toml(cls, toml: str) -> MetadataRelease: + d = tomllib.loads(toml) + return cls( + title=d["title"], + releasetype=d["releasetype"], + year=d["year"], + genres=d["genres"], + labels=d["labels"], + artists=[MetadataArtist(name=a["name"], role=a["role"]) for a in d["artists"]], + tracks={ + tid: MetadataTrack( + track_number=t["track_number"], + disc_number=t["disc_number"], + title=t["title"], + artists=[MetadataArtist(name=a["name"], role=a["role"]) for a in t["artists"]], + ) + for tid, t in d["tracks"].items() + }, + ) + + +def edit_release(c: Config, release_id_or_virtual_dirname: str) -> None: + cachedata = get_release(c, release_id_or_virtual_dirname) + if not cachedata: + raise ReleaseDoesNotExistError(f"Release {release_id_or_virtual_dirname} does not exist") + release, tracks = cachedata + original_metadata = MetadataRelease.from_cache(release, tracks) + toml = click.edit(original_metadata.serialize(), extension=".toml") + if not toml: + logger.info("Aborting: metadata file not submitted.") + return + release_meta = original_metadata.from_toml(toml) + if original_metadata == release_meta: + logger.info("Aborting: no metadata change detected.") + return + + for t in tracks: + track_meta = release_meta.tracks[t.id] + tags = AudioFile.from_file(t.source_path) + + dirty = False + + # Track tags. + if tags.track_number != track_meta.track_number: + tags.track_number = track_meta.track_number + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: track_number") + if tags.disc_number != track_meta.disc_number: + tags.disc_number = track_meta.disc_number + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: disc_number") + if tags.title != track_meta.title: + tags.title = track_meta.title + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: title") + tart = MetadataArtist.to_mapping(track_meta.artists) + if tags.artists != tart: + tags.artists = tart + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: artists") + + # Album tags. + if tags.album != release_meta.title: + tags.album = release_meta.title + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: album") + if tags.release_type != release_meta.releasetype: + tags.release_type = release_meta.releasetype + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: release_type") + if tags.year != release_meta.year: + tags.year = release_meta.year + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: year") + if tags.genre != release_meta.genres: + tags.genre = release_meta.genres + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: genre") + if tags.label != release_meta.labels: + tags.label = release_meta.labels + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: label") + aart = MetadataArtist.to_mapping(release_meta.artists) + if tags.album_artists != aart: + tags.album_artists = aart + dirty = True + logger.debug(f"Modified tag detected for {t.source_path}: album_artists") + + if dirty: + logger.info(f"Flushing changed tags to {t.source_path}") + tags.flush() + + update_cache_for_releases(c, [release.source_path], force=True) + + def resolve_release_ids(c: Config, release_id_or_virtual_dirname: str) -> tuple[str, str]: if valid_uuid(release_id_or_virtual_dirname): uuid = release_id_or_virtual_dirname @@ -83,5 +254,7 @@ def resolve_release_ids(c: Config, release_id_or_virtual_dirname: str) -> tuple[ virtual_dirname = release_id_or_virtual_dirname uuid = get_release_id_from_virtual_dirname(c, virtual_dirname) # type: ignore if uuid is None or virtual_dirname is None: - raise ReleaseDoesNotExistError(f"Release {uuid} ({virtual_dirname}) does not exist") + raise ReleaseDoesNotExistError( + f"Release {uuid or ''}{virtual_dirname or ''} does not exist" + ) return uuid, virtual_dirname diff --git a/rose/releases_test.py b/rose/releases_test.py index cd9d089..b779ef3 100644 --- a/rose/releases_test.py +++ b/rose/releases_test.py @@ -1,15 +1,18 @@ import shutil +from pathlib import Path +from typing import Any import pytest import tomllib from conftest import TEST_RELEASE_1 -from rose.cache import connect, update_cache +from rose.cache import CachedArtist, CachedRelease, CachedTrack, connect, get_release, update_cache from rose.config import Config from rose.releases import ( ReleaseDoesNotExistError, delete_release, dump_releases, + edit_release, resolve_release_ids, toggle_release_new, ) @@ -59,6 +62,109 @@ def test_toggle_release_new(config: Config) -> None: assert cursor.fetchone()["virtual_dirname"].startswith("{NEW} ") +def test_edit_release(monkeypatch: Any, config: Config, source_dir: Path) -> None: + release_path = source_dir / TEST_RELEASE_1.name + with connect(config) as conn: + cursor = conn.execute("SELECT id FROM releases WHERE source_path = ?", (str(release_path),)) + release_id = cursor.fetchone()["id"] + cursor = conn.execute( + "SELECT id FROM tracks WHERE release_id = ? ORDER BY track_number", (str(release_id),) + ) + track_ids = [r["id"] for r in cursor] + assert len(track_ids) == 2 + + new_toml = f""" + title = "I Really Love Blackpink" + releasetype = "single" + year = 2222 + genres = [ + "J-Pop", + "Pop-Rap", + ] + labels = [ + "YG Entertainment", + ] + artists = [ + {{ name = "BLACKPINK", role = "main" }}, + {{ name = "JISOO", role = "main" }}, + ] + + [tracks.{track_ids[0]}] + disc_number = "1" + track_number = "1" + title = "I Do Like That" + artists = [ + {{ name = "BLACKPINK", role = "main" }}, + ] + + [tracks.{track_ids[1]}] + disc_number = "1" + track_number = "2" + title = "All Eyes On Me" + artists = [ + {{ name = "JISOO", role = "main" }}, + ] + """ + monkeypatch.setattr("rose.collages.click.edit", lambda *_, **__: new_toml) + + edit_release(config, release_id) + rdata = get_release(config, release_id) + assert rdata is not None + release, tracks = rdata + assert release == CachedRelease( + id=release_id, + source_path=release_path, + cover_image_path=None, + added_at=release.added_at, + datafile_mtime=release.datafile_mtime, + virtual_dirname="{NEW} BLACKPINK;JISOO - 2222. I Really Love Blackpink - Single [J-Pop;Pop-Rap] {YG Entertainment}", # noqa: E501 + title="I Really Love Blackpink", + releasetype="single", + year=2222, + new=True, + multidisc=False, + genres=["J-Pop", "Pop-Rap"], + labels=["YG Entertainment"], + artists=[ + CachedArtist(name="BLACKPINK", role="main", alias=False), + CachedArtist(name="JISOO", role="main", alias=False), + ], + formatted_artists="BLACKPINK;JISOO", + ) + assert tracks == [ + CachedTrack( + id=track_ids[0], + source_path=release_path / "01.m4a", + source_mtime=tracks[0].source_mtime, + virtual_filename="01. I Do Like That.m4a", + title="I Do Like That", + release_id=release_id, + track_number="1", + disc_number="1", + duration_seconds=2, + artists=[ + CachedArtist(name="BLACKPINK", role="main", alias=False), + ], + formatted_artists="BLACKPINK", + ), + CachedTrack( + id=track_ids[1], + source_path=release_path / "02.m4a", + source_mtime=tracks[1].source_mtime, + virtual_filename="02. All Eyes On Me.m4a", + title="All Eyes On Me", + release_id=release_id, + track_number="2", + disc_number="1", + duration_seconds=2, + artists=[ + CachedArtist(name="JISOO", role="main", alias=False), + ], + formatted_artists="JISOO", + ), + ] + + def test_resolve_release_ids(config: Config) -> None: shutil.copytree(TEST_RELEASE_1, config.music_source_dir / TEST_RELEASE_1.name) update_cache(config) diff --git a/rose/tagger.py b/rose/tagger.py index 227100f..d8cb68d 100644 --- a/rose/tagger.py +++ b/rose/tagger.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextlib import re from dataclasses import dataclass from pathlib import Path @@ -13,7 +14,7 @@ import mutagen.oggopus import mutagen.oggvorbis -from rose.artiststr import Artists, format_artist_string, parse_artist_string +from rose.artiststr import ArtistMapping, format_artist_string, parse_artist_string from rose.common import RoseError TAG_SPLITTER_REGEX = re.compile(r" \\\\ | / |; ?| vs\. ") @@ -31,6 +32,7 @@ class UnsupportedTagValueTypeError(RoseError): @dataclass class AudioFile: + id: str | None title: str | None year: int | None track_number: str | None @@ -40,8 +42,8 @@ class AudioFile: label: list[str] release_type: str | None - album_artists: Artists - artists: Artists + album_artists: ArtistMapping + artists: ArtistMapping duration_sec: int @@ -68,6 +70,7 @@ def _get_paired_frame(x: str) -> str | None: return None return AudioFile( + id=_get_tag(m.tags, ["TXXX:ROSEID"]), title=_get_tag(m.tags, ["TIT2"]), year=_parse_year(_get_tag(m.tags, ["TDRC", "TYER"])), track_number=_parse_num(_get_tag(m.tags, ["TRCK"], first=True)), @@ -90,6 +93,7 @@ def _get_paired_frame(x: str) -> str | None: ) if isinstance(m, mutagen.mp4.MP4): return AudioFile( + id=_get_tag(m.tags, ["----:net.sunsetglow.rose:ID"]), title=_get_tag(m.tags, ["\xa9nam"]), year=_parse_year(_get_tag(m.tags, ["\xa9day"])), track_number=_get_tag(m.tags, ["trkn"], first=True), @@ -112,6 +116,7 @@ def _get_paired_frame(x: str) -> str | None: ) if isinstance(m, (mutagen.flac.FLAC, mutagen.oggvorbis.OggVorbis, mutagen.oggopus.OggOpus)): return AudioFile( + id=_get_tag(m.tags, ["roseid"]), title=_get_tag(m.tags, ["title"]), year=_parse_year(_get_tag(m.tags, ["date", "year"])), track_number=_get_tag(m.tags, ["tracknumber"], first=True), @@ -146,7 +151,8 @@ def flush(self) -> None: def _write_standard_tag(key: str, value: str | None) -> None: m.tags.delall(key) frame = getattr(mutagen.id3, key)(text=value) - m.tags.add(frame) + if value: + m.tags.add(frame) def _write_tag_with_description(name: str, value: str | None) -> None: key, desc = name.split(":", 1) @@ -154,11 +160,13 @@ def _write_tag_with_description(name: str, value: str | None) -> None: # the other tags with the shared prefix key. keep_fields = [f for f in m.tags.getall(key) if getattr(f, "desc", None) != desc] m.tags.delall(key) - frame = getattr(mutagen.id3, key)(desc=desc, text=value) - m.tags.add(frame) + if value: + frame = getattr(mutagen.id3, key)(desc=desc, text=value) + m.tags.add(frame) for f in keep_fields: m.tags.add(f) + _write_tag_with_description("TXXX:ROSEID", self.id) _write_standard_tag("TIT2", self.title) _write_standard_tag("TDRC", str(self.year)) _write_standard_tag("TRCK", self.track_number) @@ -169,19 +177,39 @@ def _write_tag_with_description(name: str, value: str | None) -> None: _write_tag_with_description("TXXX:RELEASETYPE", self.release_type) _write_standard_tag("TPE2", format_artist_string(self.album_artists, self.genre)) _write_standard_tag("TPE1", format_artist_string(self.artists, self.genre)) + # Wipe the alt. role artist tags, since we encode the full artist into the main tag. + m.tags.delall("TPE4") + m.tags.delall("TCOM") + m.tags.delall("TPE3") + # Delete all paired text frames, since these represent additional artist roles. We don't + # want to preserve them. + m.tags.delall("TIPL") + m.tags.delall("IPLS") m.save() return if isinstance(m, mutagen.mp4.MP4): if m.tags is None: m.tags = mutagen.mp4.MP4Tags() - m.tags["\xa9nam"] = self.title + m.tags["----:net.sunsetglow.rose:ID"] = (self.id or "").encode() + m.tags["\xa9nam"] = self.title or "" m.tags["\xa9day"] = str(self.year) - m.tags["\xa9alb"] = self.album + m.tags["\xa9alb"] = self.album or "" m.tags["\xa9gen"] = ";".join(self.genre) m.tags["----:com.apple.iTunes:LABEL"] = ";".join(self.label).encode() m.tags["----:com.apple.iTunes:RELEASETYPE"] = (self.release_type or "").encode() m.tags["aART"] = format_artist_string(self.album_artists, self.genre) m.tags["\xa9ART"] = format_artist_string(self.artists, self.genre) + # Wipe the alt. role artist tags, since we encode the full artist into the main tag. + with contextlib.suppress(KeyError): + del m.tags["----:com.apple.iTunes:REMIXER"] + with contextlib.suppress(KeyError): + del m.tags["----:com.apple.iTunes:PRODUCER"] + with contextlib.suppress(KeyError): + del m.tags["\xa9wrt"] + with contextlib.suppress(KeyError): + del m.tags["----:com.apple.iTunes:CONDUCTOR"] + with contextlib.suppress(KeyError): + del m.tags["----:com.apple.iTunes:DJMIXER"] # The track and disc numbers in MP4 are a bit annoying, because they must be a # single-element list of 2-tuple ints. We preserve the previous tracktotal/disctotal (as @@ -215,16 +243,28 @@ def _write_tag_with_description(name: str, value: str | None) -> None: else: m.tags = mutagen.oggopus.OggOpusVComment() assert not isinstance(m.tags, mutagen.flac.MetadataBlock) - m.tags["title"] = self.title + m.tags["roseid"] = self.id or "" + m.tags["title"] = self.title or "" m.tags["date"] = str(self.year) - m.tags["tracknumber"] = self.track_number - m.tags["discnumber"] = self.disc_number - m.tags["album"] = self.album + m.tags["tracknumber"] = self.track_number or "" + m.tags["discnumber"] = self.disc_number or "" + m.tags["album"] = self.album or "" m.tags["genre"] = ";".join(self.genre) m.tags["organization"] = ";".join(self.label) m.tags["releasetype"] = self.release_type m.tags["albumartist"] = format_artist_string(self.album_artists, self.genre) m.tags["artist"] = format_artist_string(self.artists, self.genre) + # Wipe the alt. role artist tags, since we encode the full artist into the main tag. + with contextlib.suppress(KeyError): + del m.tags["remixer"] + with contextlib.suppress(KeyError): + del m.tags["producer"] + with contextlib.suppress(KeyError): + del m.tags["composer"] + with contextlib.suppress(KeyError): + del m.tags["conductor"] + with contextlib.suppress(KeyError): + del m.tags["djmixer"] m.save() return diff --git a/rose/tagger_test.py b/rose/tagger_test.py index caddae2..c3a44d5 100644 --- a/rose/tagger_test.py +++ b/rose/tagger_test.py @@ -4,7 +4,7 @@ import pytest from conftest import TEST_TAGGER -from rose.artiststr import Artists +from rose.artiststr import ArtistMapping from rose.tagger import AudioFile, _split_tag @@ -31,7 +31,7 @@ def test_getters(filename: str, track_num: str, duration: int) -> None: assert tf.label == ["A Cool Label"] assert tf.album_artists.main == ["Artist A", "Artist B"] - assert tf.artists == Artists( + assert tf.artists == ArtistMapping( main=["Artist GH", "Artist HI"], guest=["Artist C", "Artist A"], remixer=["Artist AB", "Artist BC"], @@ -56,7 +56,11 @@ def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int) """Test the flush by flushing the file, then asserting that all the tags still read properly.""" fpath = isolated_dir / filename shutil.copyfile(TEST_TAGGER / filename, fpath) - AudioFile.from_file(fpath).flush() + tf = AudioFile.from_file(fpath) + # Inject one special case into here: modify the djmixer artist. This checks that we also clear + # the original djmixer tag, so that the next read does not contain Artist EF and Artist FG. + tf.artists.djmixer = ["New"] + tf.flush() tf = AudioFile.from_file(fpath) assert tf.track_number == track_num @@ -70,13 +74,13 @@ def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int) assert tf.label == ["A Cool Label"] assert tf.album_artists.main == ["Artist A", "Artist B"] - assert tf.artists == Artists( + assert tf.artists == ArtistMapping( main=["Artist GH", "Artist HI"], guest=["Artist C", "Artist A"], remixer=["Artist AB", "Artist BC"], producer=["Artist CD", "Artist DE"], - composer=["Artist EF", "Artist FG"], - djmixer=["Artist IJ", "Artist JK"], + composer=[], # Composer gets wiped because we're not of the classical genre :-) + djmixer=["New"], ) assert tf.duration_sec == duration diff --git a/rose/virtualfs.py b/rose/virtualfs.py index 4651878..32c93ce 100644 --- a/rose/virtualfs.py +++ b/rose/virtualfs.py @@ -19,7 +19,7 @@ collage_has_release, cover_exists, genre_exists, - get_release_files, + get_release, label_exists, list_artists, list_collage_releases, @@ -124,18 +124,21 @@ def readdir(self, path: str, _: int) -> Iterator[str]: "7. Collages", ] elif p.release: - rf = get_release_files(self.config, p.release) - for track in rf.tracks: + cachedata = get_release(self.config, p.release) + if not cachedata: + raise fuse.FuseOSError(errno.ENOENT) from None + release, tracks = cachedata + for track in tracks: yield track.virtual_filename self.getattr_cache[path + "/" + track.virtual_filename] = ( time.time(), ("file", track.source_path), ) - if rf.cover: - yield rf.cover.name - self.getattr_cache[path + "/" + rf.cover.name] = ( + if release.cover_image_path: + yield release.cover_image_path.name + self.getattr_cache[path + "/" + release.cover_image_path.name] = ( time.time(), - ("file", rf.cover), + ("file", release.cover_image_path), ) elif p.artist or p.genre or p.label or p.view == "Releases" or p.view == "New": if ( @@ -203,12 +206,14 @@ def open(self, path: str, flags: int) -> int: logger.debug(f"Parsed open path as {p}") if p.release and p.file: - rf = get_release_files(self.config, p.release) - if rf.cover and p.file == rf.cover.name: - return os.open(str(rf.cover), flags) - for track in rf.tracks: - if track.virtual_filename == p.file: - return os.open(str(track.source_path), flags) + cachedata = get_release(self.config, p.release) + if cachedata: + release, tracks = cachedata + if release.cover_image_path and p.file == release.cover_image_path.name: + return os.open(str(release.cover_image_path), flags) + for track in tracks: + if track.virtual_filename == p.file: + return os.open(str(track.source_path), flags) if flags & os.O_CREAT == os.O_CREAT: raise fuse.FuseOSError(errno.EACCES) @@ -232,12 +237,14 @@ def truncate(self, path: str, length: int, fh: int | None = None) -> None: p = parse_virtual_path(path) logger.debug(f"Parsed truncate path as {p}") if p.release and p.file: - rf = get_release_files(self.config, p.release) - if rf.cover and p.file == rf.cover.name: - os.truncate(str(rf.cover), length) - for track in rf.tracks: - if track.virtual_filename == p.file: - return os.truncate(str(track.source_path), length) + cachedata = get_release(self.config, p.release) + if cachedata: + release, tracks = cachedata + if release.cover_image_path and p.file == release.cover_image_path.name: + os.truncate(str(release.cover_image_path), length) + for track in tracks: + if track.virtual_filename == p.file: + return os.truncate(str(track.source_path), length) def release(self, path: str, fh: int) -> None: logger.debug(f"Received release for {path=} {fh=}") diff --git a/rose/watcher_test.py b/rose/watcher_test.py index 47ffe3f..044387c 100644 --- a/rose/watcher_test.py +++ b/rose/watcher_test.py @@ -27,7 +27,7 @@ def retry_for_sec(timeout_sec: float) -> Iterator[None]: yield time.sleep(0.005) if time.time() - start >= timeout_sec: - raise StopIteration + break def test_watchdog_events(config: Config) -> None: @@ -35,7 +35,7 @@ def test_watchdog_events(config: Config) -> None: with start_watcher(config): # Create release. shutil.copytree(TEST_RELEASE_2, src / TEST_RELEASE_2.name) - for _ in retry_for_sec(1): + for _ in retry_for_sec(2): with connect(config) as conn: cursor = conn.execute("SELECT id FROM releases") if {r["id"] for r in cursor.fetchall()} == {"ilovecarly"}: @@ -45,7 +45,7 @@ def test_watchdog_events(config: Config) -> None: # Create another release. shutil.copytree(TEST_RELEASE_3, src / TEST_RELEASE_3.name) - for _ in retry_for_sec(1): + for _ in retry_for_sec(2): with connect(config) as conn: cursor = conn.execute("SELECT id FROM releases") if {r["id"] for r in cursor.fetchall()} == {"ilovecarly", "ilovenewjeans"}: @@ -55,7 +55,7 @@ def test_watchdog_events(config: Config) -> None: # Create collage. shutil.copytree(TEST_COLLAGE_1, src / "!collages") - for _ in retry_for_sec(1): + for _ in retry_for_sec(2): with connect(config) as conn: cursor = conn.execute("SELECT name FROM collages") if {r["name"] for r in cursor.fetchall()} != {"Rose Gold"}: @@ -74,7 +74,7 @@ def test_watchdog_events(config: Config) -> None: # Delete release. shutil.rmtree(src / TEST_RELEASE_3.name) - for _ in retry_for_sec(1): + for _ in retry_for_sec(2): with connect(config) as conn: cursor = conn.execute("SELECT id FROM releases") if {r["id"] for r in cursor.fetchall()} != {"ilovecarly"}: @@ -89,7 +89,7 @@ def test_watchdog_events(config: Config) -> None: # Rename release. (src / TEST_RELEASE_2.name).rename(src / "lalala") time.sleep(0.5) - for _ in retry_for_sec(1): + for _ in retry_for_sec(2): with connect(config) as conn: cursor = conn.execute("SELECT id, source_path FROM releases") rows = cursor.fetchall() @@ -106,7 +106,7 @@ def test_watchdog_events(config: Config) -> None: # Rename collage. (src / "!collages" / "Rose Gold.toml").rename(src / "!collages" / "Black Pink.toml") - for _ in retry_for_sec(1): + for _ in retry_for_sec(2): with connect(config) as conn: cursor = conn.execute("SELECT name FROM collages") if {r["name"] for r in cursor.fetchall()} != {"Black Pink"}: @@ -120,7 +120,7 @@ def test_watchdog_events(config: Config) -> None: # Delete collage. (src / "!collages" / "Black Pink.toml").unlink() - for _ in retry_for_sec(1): + for _ in retry_for_sec(2): with connect(config) as conn: cursor = conn.execute("SELECT COUNT(*) FROM collages") if cursor.fetchone()[0] == 0: