From 6c5227ee3fc9028586c5a66025bb2077d0a9a2d3 Mon Sep 17 00:00:00 2001 From: blissful Date: Sat, 28 Oct 2023 23:11:12 -0400 Subject: [PATCH] refresh cache after closing a write through virtualfs; fix multiprocessing.set_start_method breaking xdist --- rose/cli_test.py | 3 ++ rose/tagger_test.py | 86 +++++++++++++++++++++--------------------- rose/virtualfs.py | 50 +++++++++++++++--------- rose/virtualfs_test.py | 32 ++++++++++++---- 4 files changed, 103 insertions(+), 68 deletions(-) diff --git a/rose/cli_test.py b/rose/cli_test.py index 0c0c582..61f8c1d 100644 --- a/rose/cli_test.py +++ b/rose/cli_test.py @@ -1,3 +1,4 @@ +import multiprocessing import os from typing import Any @@ -34,6 +35,8 @@ def mock_exit(x: int) -> None: raise SystemExit(x) monkeypatch.setattr(os, "_exit", mock_exit) + # set_start_method borks a bit in tests; it's not important in this test anyways. + monkeypatch.setattr(multiprocessing, "set_start_method", lambda _: None) ctx = Context(config=config) runner = CliRunner() diff --git a/rose/tagger_test.py b/rose/tagger_test.py index d3fee6c..480772c 100644 --- a/rose/tagger_test.py +++ b/rose/tagger_test.py @@ -23,19 +23,19 @@ ], ) def test_getters(filename: str, track_num: str, duration: int) -> None: - tf = AudioFile.from_file(TEST_TAGGER / filename) - assert tf.track_number == track_num - assert tf.title == f"Track {track_num}" - - assert tf.album == "A Cool Album" - assert tf.release_type == "album" - assert tf.year == 1990 - assert tf.disc_number == "1" - assert tf.genre == ["Electronic", "House"] - assert tf.label == ["A Cool Label"] - - assert tf.album_artists.main == ["Artist A", "Artist B"] - assert tf.artists == ArtistMapping( + af = AudioFile.from_file(TEST_TAGGER / filename) + assert af.track_number == track_num + assert af.title == f"Track {track_num}" + + assert af.album == "A Cool Album" + assert af.release_type == "album" + assert af.year == 1990 + assert af.disc_number == "1" + assert af.genre == ["Electronic", "House"] + assert af.label == ["A Cool Label"] + + assert af.album_artists.main == ["Artist A", "Artist B"] + assert af.artists == ArtistMapping( main=["Artist GH", "Artist HI"], guest=["Artist C", "Artist A"], remixer=["Artist AB", "Artist BC"], @@ -43,7 +43,7 @@ def test_getters(filename: str, track_num: str, duration: int) -> None: composer=["Artist EF", "Artist FG"], djmixer=["Artist IJ", "Artist JK"], ) - assert tf.duration_sec == duration + assert af.duration_sec == duration @pytest.mark.parametrize( @@ -60,25 +60,25 @@ def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int) """Test the flush by flushing the file, then asserting that all the tags still read properly.""" fpath = isolated_dir / filename shutil.copyfile(TEST_TAGGER / filename, fpath) - tf = AudioFile.from_file(fpath) + af = AudioFile.from_file(fpath) # Inject one special case into here: modify the djmixer artist. This checks that we also clear # the original djmixer tag, so that the next read does not contain Artist EF and Artist FG. - tf.artists.djmixer = ["New"] - tf.flush() - tf = AudioFile.from_file(fpath) - - assert tf.track_number == track_num - assert tf.title == f"Track {track_num}" - - assert tf.album == "A Cool Album" - assert tf.release_type == "album" - assert tf.year == 1990 - assert tf.disc_number == "1" - assert tf.genre == ["Electronic", "House"] - assert tf.label == ["A Cool Label"] - - assert tf.album_artists.main == ["Artist A", "Artist B"] - assert tf.artists == ArtistMapping( + af.artists.djmixer = ["New"] + af.flush() + af = AudioFile.from_file(fpath) + + assert af.track_number == track_num + assert af.title == f"Track {track_num}" + + assert af.album == "A Cool Album" + assert af.release_type == "album" + assert af.year == 1990 + assert af.disc_number == "1" + assert af.genre == ["Electronic", "House"] + assert af.label == ["A Cool Label"] + + assert af.album_artists.main == ["Artist A", "Artist B"] + assert af.artists == ArtistMapping( main=["Artist GH", "Artist HI"], guest=["Artist C", "Artist A"], remixer=["Artist AB", "Artist BC"], @@ -86,7 +86,7 @@ def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int) composer=[], # Composer gets wiped because we're not of the classical genre :-) djmixer=["New"], ) - assert tf.duration_sec == duration + assert af.duration_sec == duration @pytest.mark.parametrize( @@ -99,22 +99,22 @@ def test_release_type_normalization(isolated_dir: Path, filename: str) -> None: shutil.copyfile(TEST_TAGGER / filename, fpath) # Check that release type is read correctly. - tf = AudioFile.from_file(fpath) - assert tf.release_type == "album" + af = AudioFile.from_file(fpath) + assert af.release_type == "album" # Assert that attempting to flush a stupid value fails. - tf.release_type = "lalala" + af.release_type = "lalala" with pytest.raises(UnsupportedTagValueTypeError): - tf.flush() + af.flush() # Flush it anyways... - tf.flush(validate=False) + af.flush(validate=False) # Check that stupid release type is normalized as unknown. - tf = AudioFile.from_file(fpath) - assert tf.release_type == "unknown" + af = AudioFile.from_file(fpath) + assert af.release_type == "unknown" # And now assert that the read is case insensitive. - tf.release_type = "ALBUM" - tf.flush(validate=False) - tf = AudioFile.from_file(fpath) - assert tf.release_type == "album" + af.release_type = "ALBUM" + af.flush(validate=False) + af = AudioFile.from_file(fpath) + assert af.release_type == "album" def test_split_tag() -> None: diff --git a/rose/virtualfs.py b/rose/virtualfs.py index b0b83fa..7ffbabb 100644 --- a/rose/virtualfs.py +++ b/rose/virtualfs.py @@ -53,6 +53,7 @@ genre_exists, get_playlist, get_release, + get_release_source_path_from_id, label_exists, list_artists, list_collage_releases, @@ -63,6 +64,7 @@ list_releases, release_exists, track_exists, + update_cache_for_releases, ) from rose.collages import ( add_release_to_collage, @@ -352,7 +354,7 @@ def next(self) -> int: class RoseLogicalFS: def __init__(self, config: Config, fhgen: FileHandleGenerator): self.config = config - # We use this object to determine whether we should show an artist/genre/label + self.fhgen = fhgen self.can_show = CanShower(config) # We implement the "add track to playlist" operation in a slightly special way. Unlike # releases, where the virtual dirname is globally unique, track filenames are not globally @@ -373,7 +375,11 @@ def __init__(self, config: Config, fhgen: FileHandleGenerator): # # The state is a mapping of fh -> (playlist_name, ext, bytes). self.playlist_additions_in_progress: dict[int, tuple[str, str, bytearray]] = {} - self.fhgen = fhgen + # We want to trigger a cache update whenever we notice that a file has been updated through + # the virtual filesystem. To do this, we insert the file handle and release ID on open, and + # then trigger the cache update on release. We use this variable to transport that state + # between the two syscalls. + self.update_release_on_fh_close: dict[int, str] = {} super().__init__() @staticmethod @@ -653,7 +659,8 @@ def mkdir(self, p: VirtualPath) -> None: except ReleaseDoesNotExistError as e: err = e logger.debug( - f"Failed adding release {p.release} to collage {p.collage}: release not found" + f"LOGICAL: Failed adding release {p.release} to collage {p.collage}: " + "release not found" ) raise llfuse.FUSEError(errno.ENOENT) from err if p.playlist and p.file is None: @@ -735,7 +742,10 @@ def open(self, p: VirtualPath, flags: int) -> int: return os.open(str(release.cover_image_path), flags) for track in tracks: if track.virtual_filename == p.file: - return os.open(str(track.source_path), flags) + fh = os.open(str(track.source_path), flags) + if flags & os.O_WRONLY == os.O_WRONLY or flags & os.O_RDWR == os.O_RDWR: + self.update_release_on_fh_close[fh] = track.release_id + return fh raise llfuse.FUSEError(err) if p.playlist and p.file: try: @@ -746,7 +756,9 @@ def open(self, p: VirtualPath, flags: int) -> int: # operation sequence. See the __init__ for more details. if flags & os.O_CREAT == os.O_CREAT: fh = self.fhgen.next() - logger.debug(f"Begin playlist addition operation sequence for {p.file=} and {fh=}") + logger.debug( + f"LOGICAL: Begin playlist addition operation sequence for {p.file=} and {fh=}" + ) self.playlist_additions_in_progress[fh] = ( p.playlist, Path(p.file).suffix, @@ -757,7 +769,10 @@ def open(self, p: VirtualPath, flags: int) -> int: if p.file_position: for idx, track in enumerate(tracks): if track.virtual_filename == p.file and idx + 1 == int(p.file_position): - return os.open(str(track.source_path), flags) + fh = os.open(str(track.source_path), flags) + if flags & os.O_WRONLY == os.O_WRONLY or flags & os.O_RDWR == os.O_RDWR: + self.update_release_on_fh_close[fh] = track.release_id + return fh if playlist.cover_path and f"cover{playlist.cover_path.suffix}" == p.file: return os.open(playlist.cover_path, flags) raise llfuse.FUSEError(err) @@ -767,7 +782,7 @@ def open(self, p: VirtualPath, flags: int) -> int: def read(self, fh: int, offset: int, length: int) -> bytes: logger.debug(f"LOGICAL: Received read for {fh=} {offset=} {length=}") if pap := self.playlist_additions_in_progress.get(fh, None): - logger.debug("Matched read to an in-progress playlist addition") + logger.debug("LOGICAL: Matched read to an in-progress playlist addition") _, _, b = pap return b[offset : offset + length] os.lseek(fh, offset, os.SEEK_SET) @@ -776,7 +791,7 @@ def read(self, fh: int, offset: int, length: int) -> bytes: def write(self, fh: int, offset: int, data: bytes) -> int: logger.debug(f"LOGICAL: Received write for {fh=} {offset=} {len(data)=}") if pap := self.playlist_additions_in_progress.get(fh, None): - logger.debug("Matched write to an in-progress playlist addition") + logger.debug("LOGICAL: Matched write to an in-progress playlist addition") _, _, b = pap del b[offset:] b.extend(data) @@ -785,11 +800,12 @@ def write(self, fh: int, offset: int, data: bytes) -> int: return os.write(fh, data) def release(self, fh: int) -> None: + logger.debug(f"LOGICAL: Received release for {fh=}") if pap := self.playlist_additions_in_progress.get(fh, None): logger.debug("Matched release to an in-progress playlist addition") playlist, ext, b = pap if not b: - logger.debug("Aborting playlist addition release: no bytes to write") + logger.debug("LOGICAL: Aborting playlist addition release: no bytes to write") return with tempfile.TemporaryDirectory() as tmpdir: audiopath = Path(tmpdir) / f"f{ext}" @@ -799,20 +815,20 @@ def release(self, fh: int) -> None: track_id = audiofile.id if not track_id: logger.warning( - "Failed to parse track_id from file in playlist addition operation " + "LOGICAL: Failed to parse track_id from file in playlist addition operation " f"sequence: {track_id=} {fh=} {playlist=} {audiofile}" ) return add_track_to_playlist(self.config, playlist, track_id) del self.playlist_additions_in_progress[fh] return - logger.debug(f"FUSE: Received release for {fh=}") os.close(fh) - - def ftruncate(self, fh: int, length: int = 0) -> None: - # TODO: IN PROGRESS PLAYLIST ADDITION - logger.debug(f"FUSE: Received ftruncate for {fh=} {length=}") - return os.ftruncate(fh, length) + if release_id := self.update_release_on_fh_close.get(fh, None): + logger.debug( + f"LOGICAL: Triggering cache update for release {release_id} after release syscall" + ) + if source_path := get_release_source_path_from_id(self.config, release_id): + update_cache_for_releases(self.config, [source_path]) class INodeManager: @@ -950,7 +966,7 @@ def __init__(self, config: Config): maxsize=9999, ttl=2 ) self.ghost_writable_empty_directory: cachetools.TTLCache[str, bool] = cachetools.TTLCache( - maxsize=9999, ttl=2 + maxsize=9999, ttl=5 ) def reset_getattr_caches(self) -> None: diff --git a/rose/virtualfs_test.py b/rose/virtualfs_test.py index cc142fd..ab5200a 100644 --- a/rose/virtualfs_test.py +++ b/rose/virtualfs_test.py @@ -10,6 +10,7 @@ import pytest from rose.config import Config +from rose.tagger import AudioFile from rose.virtualfs import mount_virtualfs, unmount_virtualfs @@ -109,16 +110,30 @@ def can_read(p: Path) -> bool: assert can_read(root / "8. Playlists" / "Lala Lisa" / "cover.jpg") -@pytest.mark.usefixtures("seeded_cache") -def test_virtual_filesystem_write_files(config: Config) -> None: +def test_virtual_filesystem_write_files( + config: Config, + source_dir: Path, # noqa: ARG001 +) -> None: + """Assert that 1. we can write files and 2. cache updates in response.""" root = config.fuse_mount_dir + path = ( + root + / "1. Releases" + / "{NEW} BLACKPINK - 1990. I Love Blackpink [K-Pop;Pop]" + / "01. BLACKPINK - Track 1.m4a" + ) with start_virtual_fs(config): - with (root / "1. Releases" / "r1" / "01. 01.m4a").open("w") as fp: - fp.write("abc") - with (root / "1. Releases" / "r1" / "01. 01.m4a").open("r") as fp: - assert fp.read() == "abc" - with pytest.raises(OSError): # noqa: PT011 - (root / "1. Releases" / "r1" / "lalala").open("w") + # Write! + af = AudioFile.from_file(path) + assert af.title == "Track 1" + af.title = "Hahahaha!!" + af.flush() + # Read! File should have been renamed post-cache update. + assert not path.exists() + path = path.with_name("01. BLACKPINK - Hahahaha!!.m4a") + assert path.is_file() + af = AudioFile.from_file(path) + assert af.title == "Hahahaha!!" @pytest.mark.usefixtures("seeded_cache") @@ -207,6 +222,7 @@ def test_virtual_filesystem_playlist_actions( ], check=True, ) + time.sleep(0.05) assert (root / "8. Playlists" / "New Jeans" / "1. BLACKPINK - Track 1.m4a").is_file() with (src / "!playlists" / "New Jeans.toml").open("r") as fp: assert "BLACKPINK - Track 1.m4a" in fp.read()