diff --git a/rose/cache.py b/rose/cache.py index 8392ea1..1a0b1a6 100644 --- a/rose/cache.py +++ b/rose/cache.py @@ -20,7 +20,7 @@ from rose.artiststr import format_artist_string from rose.config import Config -from rose.tagger import AudioFile +from rose.tagger import SUPPORTED_EXTENSIONS, AudioFile logger = logging.getLogger(__name__) @@ -163,29 +163,6 @@ class StoredDataFile: stem + ext for stem in ["cover", "folder", "art"] for ext in [".jpg", ".jpeg", ".png"] ] -SUPPORTED_EXTENSIONS = [ - ".mp3", - ".m4a", - ".ogg", - ".opus", - ".flac", -] - -SUPPORTED_RELEASE_TYPES = [ - "album", - "single", - "ep", - "compilation", - "soundtrack", - "live", - "remix", - "djmix", - "mixtape", - "other", - "bootleg", - "unknown", -] - RELEASE_TYPE_FORMATTER = { "album": "Album", "single": "Single", @@ -494,7 +471,7 @@ def _update_cache_for_releases_executor( # any tracks, skip it. And if it does not have any tracks, but is in the cache, remove # it from the cache. for f in files: - if any(f.name.endswith(ext) for ext in SUPPORTED_EXTENSIONS): + if any(f.name.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS): break else: logger.debug(f"Did not find any audio files in release {source_path}, skipping") @@ -619,7 +596,7 @@ def _update_cache_for_releases_executor( # tags. pulled_release_tags = False for f in files: - if not any(f.name.endswith(ext) for ext in SUPPORTED_EXTENSIONS): + if not any(f.name.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS): continue track_path = Path(f.path).resolve() cached_track = cached_tracks.get(str(track_path), None) @@ -645,11 +622,7 @@ def _update_cache_for_releases_executor( release.title = release_title release_dirty = True - release_type = ( - tags.release_type.lower() - if tags.release_type and tags.release_type.lower() in SUPPORTED_RELEASE_TYPES - else "unknown" - ) + release_type = tags.release_type if release_type != release.releasetype: logger.debug(f"Release type change detected for {source_path}, updating") release.releasetype = release_type diff --git a/rose/tagger.py b/rose/tagger.py index 8bd764d..1d0238d 100644 --- a/rose/tagger.py +++ b/rose/tagger.py @@ -2,6 +2,7 @@ import contextlib import re +import sys from dataclasses import dataclass from pathlib import Path from typing import Any, no_type_check @@ -21,6 +22,39 @@ YEAR_REGEX = re.compile(r"\d{4}$") DATE_REGEX = re.compile(r"(\d{4})-\d{2}-\d{2}") +SUPPORTED_EXTENSIONS = [ + ".mp3", + ".m4a", + ".ogg", + ".opus", + ".flac", +] + +SUPPORTED_RELEASE_TYPES = [ + "album", + "single", + "ep", + "compilation", + "soundtrack", + "live", + "remix", + "djmix", + "mixtape", + "other", + "bootleg", + "unknown", +] + + +def _normalize_rtype(x: str | None) -> str: + """Determine the release type of a release.""" + if not x: + return "unknown" + x = x.lower() + if x in SUPPORTED_RELEASE_TYPES: + return x + return "unknown" + class UnsupportedFiletypeError(RoseError): pass @@ -40,7 +74,7 @@ class AudioFile: album: str | None genre: list[str] label: list[str] - release_type: str | None + release_type: str album_artists: ArtistMapping artists: ArtistMapping @@ -52,6 +86,8 @@ class AudioFile: @classmethod def from_file(cls, p: Path) -> AudioFile: """Read the tags of an audio file on disk.""" + if not any(p.suffix.lower() == ext for ext in SUPPORTED_EXTENSIONS): + raise UnsupportedFiletypeError(f"{p.suffix} not a supported filetype") m = mutagen.File(p) # type: ignore if isinstance(m, mutagen.mp3.MP3): # ID3 returns trackno/discno tags as no/total. We have to parse. @@ -78,7 +114,7 @@ def _get_paired_frame(x: str) -> str | None: album=_get_tag(m.tags, ["TALB"]), genre=_split_tag(_get_tag(m.tags, ["TCON"])), label=_split_tag(_get_tag(m.tags, ["TPUB"])), - release_type=_get_tag(m.tags, ["TXXX:RELEASETYPE"], first=True), + release_type=_normalize_rtype(_get_tag(m.tags, ["TXXX:RELEASETYPE"], first=True)), album_artists=parse_artist_string(main=_get_tag(m.tags, ["TPE2"])), artists=parse_artist_string( main=_get_tag(m.tags, ["TPE1"]), @@ -101,7 +137,9 @@ def _get_paired_frame(x: str) -> str | None: album=_get_tag(m.tags, ["\xa9alb"]), genre=_split_tag(_get_tag(m.tags, ["\xa9gen"])), label=_split_tag(_get_tag(m.tags, ["----:com.apple.iTunes:LABEL"])), - release_type=_get_tag(m.tags, ["----:com.apple.iTunes:RELEASETYPE"], first=True), + release_type=_normalize_rtype( + _get_tag(m.tags, ["----:com.apple.iTunes:RELEASETYPE"], first=True) + ), album_artists=parse_artist_string(main=_get_tag(m.tags, ["aART"])), artists=parse_artist_string( main=_get_tag(m.tags, ["\xa9ART"]), @@ -124,7 +162,7 @@ def _get_paired_frame(x: str) -> str | None: album=_get_tag(m.tags, ["album"]), genre=_split_tag(_get_tag(m.tags, ["genre"])), label=_split_tag(_get_tag(m.tags, ["organization", "label", "recordlabel"])), - release_type=_get_tag(m.tags, ["releasetype"], first=True), + release_type=_normalize_rtype(_get_tag(m.tags, ["releasetype"], first=True)), album_artists=parse_artist_string(main=_get_tag(m.tags, ["albumartist"])), artists=parse_artist_string( main=_get_tag(m.tags, ["artist"]), @@ -140,9 +178,17 @@ def _get_paired_frame(x: str) -> str | None: raise UnsupportedFiletypeError(f"{p} is not a supported audio file") @no_type_check - def flush(self) -> None: + def flush(self, *, validate: bool = True) -> None: """Flush the current tags to the file on disk.""" m = self._m + if not validate and "pytest" not in sys.modules: + raise Exception("Validate can only be turned off by tests.") + + if validate and self.release_type not in SUPPORTED_RELEASE_TYPES: + raise UnsupportedTagValueTypeError( + f"Release type {self.release_type} is not a supported release type.\n" + f"Supported release types: {', '.join(SUPPORTED_RELEASE_TYPES)}" + ) if isinstance(m, mutagen.mp3.MP3): if m.tags is None: diff --git a/rose/tagger_test.py b/rose/tagger_test.py index c3a44d5..d3fee6c 100644 --- a/rose/tagger_test.py +++ b/rose/tagger_test.py @@ -5,7 +5,11 @@ from conftest import TEST_TAGGER from rose.artiststr import ArtistMapping -from rose.tagger import AudioFile, _split_tag +from rose.tagger import ( + AudioFile, + UnsupportedTagValueTypeError, + _split_tag, +) @pytest.mark.parametrize( @@ -24,7 +28,7 @@ def test_getters(filename: str, track_num: str, duration: int) -> None: assert tf.title == f"Track {track_num}" assert tf.album == "A Cool Album" - assert tf.release_type == "Album" + assert tf.release_type == "album" assert tf.year == 1990 assert tf.disc_number == "1" assert tf.genre == ["Electronic", "House"] @@ -67,7 +71,7 @@ def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int) assert tf.title == f"Track {track_num}" assert tf.album == "A Cool Album" - assert tf.release_type == "Album" + assert tf.release_type == "album" assert tf.year == 1990 assert tf.disc_number == "1" assert tf.genre == ["Electronic", "House"] @@ -85,6 +89,34 @@ def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int) assert tf.duration_sec == duration +@pytest.mark.parametrize( + "filename", + ["track1.flac", "track2.m4a", "track3.mp3", "track4.vorbis.ogg", "track5.opus.ogg"], +) +def test_release_type_normalization(isolated_dir: Path, filename: str) -> None: + """Test the flush by flushing the file, then asserting that all the tags still read properly.""" + fpath = isolated_dir / filename + shutil.copyfile(TEST_TAGGER / filename, fpath) + + # Check that release type is read correctly. + tf = AudioFile.from_file(fpath) + assert tf.release_type == "album" + # Assert that attempting to flush a stupid value fails. + tf.release_type = "lalala" + with pytest.raises(UnsupportedTagValueTypeError): + tf.flush() + # Flush it anyways... + tf.flush(validate=False) + # Check that stupid release type is normalized as unknown. + tf = AudioFile.from_file(fpath) + assert tf.release_type == "unknown" + # And now assert that the read is case insensitive. + tf.release_type = "ALBUM" + tf.flush(validate=False) + tf = AudioFile.from_file(fpath) + assert tf.release_type == "album" + + def test_split_tag() -> None: assert _split_tag(r"a \\ b") == ["a", "b"] assert _split_tag(r"a \ b") == [r"a \ b"]