From 92cf7cac2f133bce7e7f62f4d38d351025b7d658 Mon Sep 17 00:00:00 2001 From: blissful Date: Mon, 23 Oct 2023 23:10:03 -0400 Subject: [PATCH] implement tag writing capability in tagger.py --- README.md | 3 +- rose/artiststr.py | 40 +++++-- rose/artiststr_test.py | 5 + rose/tagger.py | 263 ++++++++++++++++++++++++++++------------- rose/tagger_test.py | 48 +++++++- 5 files changed, 265 insertions(+), 94 deletions(-) diff --git a/README.md b/README.md index 2836bb4..a00aab5 100644 --- a/README.md +++ b/README.md @@ -212,12 +212,13 @@ delimiters. for example, `artist=Pyotr Ilyich Tchaikovsky performed by André Pr The artist tag is described by the following grammar: ``` -artist-tag ::= composer dj main guest remixer +artist-tag ::= composer dj main guest remixer producer composer ::= name ' performed by ' dj ::= name ' pres. ' main ::= name guest ::= ' feat. ' name remixer ::= ' remixed by ' name +producer ::= ' produced by ' name name ::= string ';' name | string ``` diff --git a/rose/artiststr.py b/rose/artiststr.py index 9643df4..88e1d32 100644 --- a/rose/artiststr.py +++ b/rose/artiststr.py @@ -1,6 +1,10 @@ +import logging import re from dataclasses import dataclass, field +logger = logging.getLogger(__name__) + + TAG_SPLITTER_REGEX = re.compile(r" \\\\ | / |; ?| vs\. ") @@ -32,6 +36,9 @@ def _split_tag(t: str | None) -> list[str]: li_composer = _split_tag(composer) li_producer = _split_tag(producer) li_dj = _split_tag(dj) + if main and "produced by " in main: + main, producer = re.split(r" ?produced by ", main, maxsplit=1) + li_producer.extend(_split_tag(producer)) if main and "remixed by " in main: main, remixer = re.split(r" ?remixed by ", main, maxsplit=1) li_remixer.extend(_split_tag(remixer)) @@ -47,18 +54,22 @@ def _split_tag(t: str | None) -> list[str]: if main: li_main.extend(_split_tag(main)) - return Artists( - main=li_main, - guest=li_guests, - remixer=li_remixer, - composer=li_composer, - producer=li_producer, - djmixer=li_dj, + rval = Artists( + main=_deduplicate(li_main), + guest=_deduplicate(li_guests), + remixer=_deduplicate(li_remixer), + composer=_deduplicate(li_composer), + producer=_deduplicate(li_producer), + djmixer=_deduplicate(li_dj), + ) + logger.debug( + f"Parsed args {main=} {remixer=} {composer=} {conductor=} {producer=} {dj=} as {rval=}" ) + return rval def format_artist_string(a: Artists, genres: list[str]) -> str: - r = ";".join(a.producer + a.main + a.remixer) + r = ";".join(a.main) if a.composer and "Classical" in genres: r = ";".join(a.composer) + " performed by " + r if a.djmixer: @@ -67,4 +78,17 @@ def format_artist_string(a: Artists, genres: list[str]) -> str: r += " feat. " + ";".join(a.guest) if a.remixer: r += " remixed by " + ";".join(a.remixer) + if a.producer: + r += " produced by " + ";".join(a.producer) + logger.debug(f"Formatted {a} ({genres=}) as {r}") + return r + + +def _deduplicate(xs: list[str]) -> list[str]: + seen: set[str] = set() + r: list[str] = [] + for x in xs: + if x not in seen: + r.append(x) + seen.add(x) return r diff --git a/rose/artiststr_test.py b/rose/artiststr_test.py index 149828d..5854136 100644 --- a/rose/artiststr_test.py +++ b/rose/artiststr_test.py @@ -19,6 +19,11 @@ def test_parse_artist_string() -> None: main=["B", "C"], guest=["D", "E"], ) + # Test the deduplication handling. + assert parse_artist_string("A pres. B", dj="A") == Artists( + djmixer=["A"], + main=["B"], + ) def test_format_artist_string() -> None: diff --git a/rose/tagger.py b/rose/tagger.py index 8effb60..227100f 100644 --- a/rose/tagger.py +++ b/rose/tagger.py @@ -3,7 +3,7 @@ import re from dataclasses import dataclass from pathlib import Path -from typing import Any +from typing import Any, no_type_check import mutagen import mutagen.flac @@ -13,7 +13,7 @@ import mutagen.oggopus import mutagen.oggvorbis -from rose.artiststr import Artists, parse_artist_string +from rose.artiststr import Artists, format_artist_string, parse_artist_string from rose.common import RoseError TAG_SPLITTER_REGEX = re.compile(r" \\\\ | / |; ?| vs\. ") @@ -45,91 +45,190 @@ class AudioFile: duration_sec: int + _m: Any + @classmethod def from_file(cls, p: Path) -> AudioFile: - return _convert_mutagen(mutagen.File(p), p) # type: ignore + """Read the tags of an audio file on disk.""" + m = mutagen.File(p) # type: ignore + if isinstance(m, mutagen.mp3.MP3): + # ID3 returns trackno/discno tags as no/total. We have to parse. + def _parse_num(x: str | None) -> str | None: + return x.split("/")[0] if x else None + def _get_paired_frame(x: str) -> str | None: + if not m.tags: + return None + for tag in ["TIPL", "IPLS"]: + try: + frame = m.tags[tag] + except KeyError: + continue + return r" \\ ".join([p[1] for p in frame.people if p[0].lower() == x.lower()]) + return None -def _convert_mutagen(m: Any, p: Path) -> AudioFile: - if isinstance(m, mutagen.mp3.MP3): - # ID3 returns trackno/discno tags as no/total. We have to parse. - def _parse_num(x: str | None) -> str | None: - return x.split("/")[0] if x else None + return AudioFile( + title=_get_tag(m.tags, ["TIT2"]), + year=_parse_year(_get_tag(m.tags, ["TDRC", "TYER"])), + track_number=_parse_num(_get_tag(m.tags, ["TRCK"], first=True)), + disc_number=_parse_num(_get_tag(m.tags, ["TPOS"], first=True)), + album=_get_tag(m.tags, ["TALB"]), + genre=_split_tag(_get_tag(m.tags, ["TCON"])), + label=_split_tag(_get_tag(m.tags, ["TPUB"])), + release_type=_get_tag(m.tags, ["TXXX:RELEASETYPE"], first=True), + album_artists=parse_artist_string(main=_get_tag(m.tags, ["TPE2"])), + artists=parse_artist_string( + main=_get_tag(m.tags, ["TPE1"]), + remixer=_get_tag(m.tags, ["TPE4"]), + composer=_get_tag(m.tags, ["TCOM"]), + conductor=_get_tag(m.tags, ["TPE3"]), + producer=_get_paired_frame("producer"), + dj=_get_paired_frame("DJ-mix"), + ), + duration_sec=round(m.info.length), + _m=m, + ) + if isinstance(m, mutagen.mp4.MP4): + return AudioFile( + title=_get_tag(m.tags, ["\xa9nam"]), + year=_parse_year(_get_tag(m.tags, ["\xa9day"])), + track_number=_get_tag(m.tags, ["trkn"], first=True), + disc_number=_get_tag(m.tags, ["disk"], first=True), + album=_get_tag(m.tags, ["\xa9alb"]), + genre=_split_tag(_get_tag(m.tags, ["\xa9gen"])), + label=_split_tag(_get_tag(m.tags, ["----:com.apple.iTunes:LABEL"])), + release_type=_get_tag(m.tags, ["----:com.apple.iTunes:RELEASETYPE"], first=True), + album_artists=parse_artist_string(main=_get_tag(m.tags, ["aART"])), + artists=parse_artist_string( + main=_get_tag(m.tags, ["\xa9ART"]), + remixer=_get_tag(m.tags, ["----:com.apple.iTunes:REMIXER"]), + producer=_get_tag(m.tags, ["----:com.apple.iTunes:PRODUCER"]), + composer=_get_tag(m.tags, ["\xa9wrt"]), + conductor=_get_tag(m.tags, ["----:com.apple.iTunes:CONDUCTOR"]), + dj=_get_tag(m.tags, ["----:com.apple.iTunes:DJMIXER"]), + ), + duration_sec=round(m.info.length), # type: ignore + _m=m, + ) + if isinstance(m, (mutagen.flac.FLAC, mutagen.oggvorbis.OggVorbis, mutagen.oggopus.OggOpus)): + return AudioFile( + title=_get_tag(m.tags, ["title"]), + year=_parse_year(_get_tag(m.tags, ["date", "year"])), + track_number=_get_tag(m.tags, ["tracknumber"], first=True), + disc_number=_get_tag(m.tags, ["discnumber"], first=True), + album=_get_tag(m.tags, ["album"]), + genre=_split_tag(_get_tag(m.tags, ["genre"])), + label=_split_tag(_get_tag(m.tags, ["organization", "label", "recordlabel"])), + release_type=_get_tag(m.tags, ["releasetype"], first=True), + album_artists=parse_artist_string(main=_get_tag(m.tags, ["albumartist"])), + artists=parse_artist_string( + main=_get_tag(m.tags, ["artist"]), + remixer=_get_tag(m.tags, ["remixer"]), + producer=_get_tag(m.tags, ["producer"]), + composer=_get_tag(m.tags, ["composer"]), + conductor=_get_tag(m.tags, ["conductor"]), + dj=_get_tag(m.tags, ["djmixer"]), + ), + duration_sec=round(m.info.length), # type: ignore + _m=m, + ) + raise UnsupportedFiletypeError(f"{p} is not a supported audio file") - def _get_paired_frame(x: str) -> str | None: - if not m.tags: - return None - for tag in ["TIPL", "IPLS"]: - try: - frame = m.tags[tag] - except KeyError: - continue - return r" \\ ".join([p[1] for p in frame.people if p[0].lower() == x.lower()]) - return None - - return AudioFile( - title=_get_tag(m.tags, ["TIT2"]), - year=_parse_year(_get_tag(m.tags, ["TDRC", "TYER"])), - track_number=_parse_num(_get_tag(m.tags, ["TRCK"], first=True)), - disc_number=_parse_num(_get_tag(m.tags, ["TPOS"], first=True)), - album=_get_tag(m.tags, ["TALB"]), - genre=_split_tag(_get_tag(m.tags, ["TCON"])), - label=_split_tag(_get_tag(m.tags, ["TPUB"])), - release_type=_get_tag(m.tags, ["TXXX:RELEASETYPE"], first=True), - album_artists=parse_artist_string(main=_get_tag(m.tags, ["TPE2"])), - artists=parse_artist_string( - main=_get_tag(m.tags, ["TPE1"]), - remixer=_get_tag(m.tags, ["TPE4"]), - composer=_get_tag(m.tags, ["TCOM"]), - conductor=_get_tag(m.tags, ["TPE3"]), - producer=_get_paired_frame("producer"), - dj=_get_paired_frame("DJ-mix"), - ), - duration_sec=round(m.info.length), - ) - if isinstance(m, mutagen.mp4.MP4): - return AudioFile( - title=_get_tag(m.tags, ["\xa9nam"]), - year=_parse_year(_get_tag(m.tags, ["\xa9day"])), - track_number=_get_tag(m.tags, ["trkn"], first=True), - disc_number=_get_tag(m.tags, ["disk"], first=True), - album=_get_tag(m.tags, ["\xa9alb"]), - genre=_split_tag(_get_tag(m.tags, ["\xa9gen"])), - label=_split_tag(_get_tag(m.tags, ["----:com.apple.iTunes:LABEL"])), - release_type=_get_tag(m.tags, ["----:com.apple.iTunes:RELEASETYPE"], first=True), - album_artists=parse_artist_string(main=_get_tag(m.tags, ["aART"])), - artists=parse_artist_string( - main=_get_tag(m.tags, ["\xa9ART"]), - remixer=_get_tag(m.tags, ["----:com.apple.iTunes:REMIXER"]), - producer=_get_tag(m.tags, ["----:com.apple.iTunes:PRODUCER"]), - composer=_get_tag(m.tags, ["\xa9wrt"]), - conductor=_get_tag(m.tags, ["----:com.apple.iTunes:CONDUCTOR"]), - dj=_get_tag(m.tags, ["----:com.apple.iTunes:DJMIXER"]), - ), - duration_sec=round(m.info.length), # type: ignore - ) - if isinstance(m, (mutagen.flac.FLAC, mutagen.oggvorbis.OggVorbis, mutagen.oggopus.OggOpus)): - return AudioFile( - title=_get_tag(m.tags, ["title"]), - year=_parse_year(_get_tag(m.tags, ["date", "year"])), - track_number=_get_tag(m.tags, ["tracknumber"], first=True), - disc_number=_get_tag(m.tags, ["discnumber"], first=True), - album=_get_tag(m.tags, ["album"]), - genre=_split_tag(_get_tag(m.tags, ["genre"])), - label=_split_tag(_get_tag(m.tags, ["organization", "label", "recordlabel"])), - release_type=_get_tag(m.tags, ["releasetype"], first=True), - album_artists=parse_artist_string(main=_get_tag(m.tags, ["albumartist"])), - artists=parse_artist_string( - main=_get_tag(m.tags, ["artist"]), - remixer=_get_tag(m.tags, ["remixer"]), - producer=_get_tag(m.tags, ["producer"]), - composer=_get_tag(m.tags, ["composer"]), - conductor=_get_tag(m.tags, ["conductor"]), - dj=_get_tag(m.tags, ["djmixer"]), - ), - duration_sec=round(m.info.length), # type: ignore - ) - raise UnsupportedFiletypeError(f"{p} is not a supported audio file") + @no_type_check + def flush(self) -> None: + """Flush the current tags to the file on disk.""" + m = self._m + + if isinstance(m, mutagen.mp3.MP3): + if m.tags is None: + m.tags = mutagen.id3.ID3() + + def _write_standard_tag(key: str, value: str | None) -> None: + m.tags.delall(key) + frame = getattr(mutagen.id3, key)(text=value) + m.tags.add(frame) + + def _write_tag_with_description(name: str, value: str | None) -> None: + key, desc = name.split(":", 1) + # Since the ID3 tags work with the shared prefix key before `:`, manually preserve + # the other tags with the shared prefix key. + keep_fields = [f for f in m.tags.getall(key) if getattr(f, "desc", None) != desc] + m.tags.delall(key) + frame = getattr(mutagen.id3, key)(desc=desc, text=value) + m.tags.add(frame) + for f in keep_fields: + m.tags.add(f) + + _write_standard_tag("TIT2", self.title) + _write_standard_tag("TDRC", str(self.year)) + _write_standard_tag("TRCK", self.track_number) + _write_standard_tag("TPOS", self.disc_number) + _write_standard_tag("TALB", self.album) + _write_standard_tag("TCON", ";".join(self.genre)) + _write_standard_tag("TPUB", ";".join(self.label)) + _write_tag_with_description("TXXX:RELEASETYPE", self.release_type) + _write_standard_tag("TPE2", format_artist_string(self.album_artists, self.genre)) + _write_standard_tag("TPE1", format_artist_string(self.artists, self.genre)) + m.save() + return + if isinstance(m, mutagen.mp4.MP4): + if m.tags is None: + m.tags = mutagen.mp4.MP4Tags() + m.tags["\xa9nam"] = self.title + m.tags["\xa9day"] = str(self.year) + m.tags["\xa9alb"] = self.album + m.tags["\xa9gen"] = ";".join(self.genre) + m.tags["----:com.apple.iTunes:LABEL"] = ";".join(self.label).encode() + m.tags["----:com.apple.iTunes:RELEASETYPE"] = (self.release_type or "").encode() + m.tags["aART"] = format_artist_string(self.album_artists, self.genre) + m.tags["\xa9ART"] = format_artist_string(self.artists, self.genre) + + # The track and disc numbers in MP4 are a bit annoying, because they must be a + # single-element list of 2-tuple ints. We preserve the previous tracktotal/disctotal (as + # Rose does not care about those values), and then attempt to write our own track_number + # and disc_number. + try: + prev_tracktotal = m.tags["trkn"][0][1] + except (KeyError, IndexError): + prev_tracktotal = 1 + try: + prev_disctotal = m.tags["disk"][0][1] + except (KeyError, IndexError): + prev_disctotal = 1 + try: + m.tags["trkn"] = [(int(self.track_number or "0"), prev_tracktotal)] + m.tags["disk"] = [(int(self.disc_number or "0"), prev_disctotal)] + except ValueError as e: + raise UnsupportedTagValueTypeError( + "Could not write m4a trackno/discno tags: must be integers. " + f"Got: {self.track_number=} / {self.disc_number=}" + ) from e + + m.save() + return + if isinstance(m, (mutagen.flac.FLAC, mutagen.oggvorbis.OggVorbis, mutagen.oggopus.OggOpus)): + if m.tags is None: + if isinstance(m, mutagen.flac.FLAC): + m.tags = mutagen.flac.VCFLACDict() + elif isinstance(m, mutagen.oggvorbis.OggVorbis): + m.tags = mutagen.oggvorbis.OggVCommentDict() + else: + m.tags = mutagen.oggopus.OggOpusVComment() + assert not isinstance(m.tags, mutagen.flac.MetadataBlock) + m.tags["title"] = self.title + m.tags["date"] = str(self.year) + m.tags["tracknumber"] = self.track_number + m.tags["discnumber"] = self.disc_number + m.tags["album"] = self.album + m.tags["genre"] = ";".join(self.genre) + m.tags["organization"] = ";".join(self.label) + m.tags["releasetype"] = self.release_type + m.tags["albumartist"] = format_artist_string(self.album_artists, self.genre) + m.tags["artist"] = format_artist_string(self.artists, self.genre) + m.save() + return + + raise RoseError(f"Impossible: unknown mutagen type: {type(m)=} ({repr(m)=})") def _split_tag(t: str | None) -> list[str]: diff --git a/rose/tagger_test.py b/rose/tagger_test.py index a7ca0e7..caddae2 100644 --- a/rose/tagger_test.py +++ b/rose/tagger_test.py @@ -1,3 +1,6 @@ +import shutil +from pathlib import Path + import pytest from conftest import TEST_TAGGER @@ -6,7 +9,7 @@ @pytest.mark.parametrize( - ("filepath", "track_num", "duration"), + ("filename", "track_num", "duration"), [ ("track1.flac", "1", 2), ("track2.m4a", "2", 2), @@ -15,8 +18,47 @@ ("track5.opus.ogg", "5", 1), ], ) -def test_getters(filepath: str, track_num: str, duration: int) -> None: - tf = AudioFile.from_file(TEST_TAGGER / filepath) +def test_getters(filename: str, track_num: str, duration: int) -> None: + tf = AudioFile.from_file(TEST_TAGGER / filename) + assert tf.track_number == track_num + assert tf.title == f"Track {track_num}" + + assert tf.album == "A Cool Album" + assert tf.release_type == "Album" + assert tf.year == 1990 + assert tf.disc_number == "1" + assert tf.genre == ["Electronic", "House"] + assert tf.label == ["A Cool Label"] + + assert tf.album_artists.main == ["Artist A", "Artist B"] + assert tf.artists == Artists( + main=["Artist GH", "Artist HI"], + guest=["Artist C", "Artist A"], + remixer=["Artist AB", "Artist BC"], + producer=["Artist CD", "Artist DE"], + composer=["Artist EF", "Artist FG"], + djmixer=["Artist IJ", "Artist JK"], + ) + assert tf.duration_sec == duration + + +@pytest.mark.parametrize( + ("filename", "track_num", "duration"), + [ + ("track1.flac", "1", 2), + ("track2.m4a", "2", 2), + ("track3.mp3", "3", 1), + ("track4.vorbis.ogg", "4", 1), + ("track5.opus.ogg", "5", 1), + ], +) +def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int) -> None: + """Test the flush by flushing the file, then asserting that all the tags still read properly.""" + fpath = isolated_dir / filename + shutil.copyfile(TEST_TAGGER / filename, fpath) + AudioFile.from_file(fpath).flush() + tf = AudioFile.from_file(fpath) + assert tf.track_number == track_num assert tf.title == f"Track {track_num}"