Skip to content

Commit

Permalink
refresh cache after closing a write through virtualfs; fix multiproce…
Browse files Browse the repository at this point in the history
…ssing.set_start_method breaking xdist
  • Loading branch information
azuline committed Oct 29, 2023
1 parent 11653d3 commit 6c5227e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 68 deletions.
3 changes: 3 additions & 0 deletions rose/cli_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import multiprocessing
import os
from typing import Any

Expand Down Expand Up @@ -34,6 +35,8 @@ def mock_exit(x: int) -> None:
raise SystemExit(x)

monkeypatch.setattr(os, "_exit", mock_exit)
# set_start_method borks a bit in tests; it's not important in this test anyways.
monkeypatch.setattr(multiprocessing, "set_start_method", lambda _: None)

ctx = Context(config=config)
runner = CliRunner()
Expand Down
86 changes: 43 additions & 43 deletions rose/tagger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,27 @@
],
)
def test_getters(filename: str, track_num: str, duration: int) -> None:
tf = AudioFile.from_file(TEST_TAGGER / filename)
assert tf.track_number == track_num
assert tf.title == f"Track {track_num}"

assert tf.album == "A Cool Album"
assert tf.release_type == "album"
assert tf.year == 1990
assert tf.disc_number == "1"
assert tf.genre == ["Electronic", "House"]
assert tf.label == ["A Cool Label"]

assert tf.album_artists.main == ["Artist A", "Artist B"]
assert tf.artists == ArtistMapping(
af = AudioFile.from_file(TEST_TAGGER / filename)
assert af.track_number == track_num
assert af.title == f"Track {track_num}"

assert af.album == "A Cool Album"
assert af.release_type == "album"
assert af.year == 1990
assert af.disc_number == "1"
assert af.genre == ["Electronic", "House"]
assert af.label == ["A Cool Label"]

assert af.album_artists.main == ["Artist A", "Artist B"]
assert af.artists == ArtistMapping(
main=["Artist GH", "Artist HI"],
guest=["Artist C", "Artist A"],
remixer=["Artist AB", "Artist BC"],
producer=["Artist CD", "Artist DE"],
composer=["Artist EF", "Artist FG"],
djmixer=["Artist IJ", "Artist JK"],
)
assert tf.duration_sec == duration
assert af.duration_sec == duration


@pytest.mark.parametrize(
Expand All @@ -60,33 +60,33 @@ def test_flush(isolated_dir: Path, filename: str, track_num: str, duration: int)
"""Test the flush by flushing the file, then asserting that all the tags still read properly."""
fpath = isolated_dir / filename
shutil.copyfile(TEST_TAGGER / filename, fpath)
tf = AudioFile.from_file(fpath)
af = AudioFile.from_file(fpath)
# Inject one special case into here: modify the djmixer artist. This checks that we also clear
# the original djmixer tag, so that the next read does not contain Artist EF and Artist FG.
tf.artists.djmixer = ["New"]
tf.flush()
tf = AudioFile.from_file(fpath)

assert tf.track_number == track_num
assert tf.title == f"Track {track_num}"

assert tf.album == "A Cool Album"
assert tf.release_type == "album"
assert tf.year == 1990
assert tf.disc_number == "1"
assert tf.genre == ["Electronic", "House"]
assert tf.label == ["A Cool Label"]

assert tf.album_artists.main == ["Artist A", "Artist B"]
assert tf.artists == ArtistMapping(
af.artists.djmixer = ["New"]
af.flush()
af = AudioFile.from_file(fpath)

assert af.track_number == track_num
assert af.title == f"Track {track_num}"

assert af.album == "A Cool Album"
assert af.release_type == "album"
assert af.year == 1990
assert af.disc_number == "1"
assert af.genre == ["Electronic", "House"]
assert af.label == ["A Cool Label"]

assert af.album_artists.main == ["Artist A", "Artist B"]
assert af.artists == ArtistMapping(
main=["Artist GH", "Artist HI"],
guest=["Artist C", "Artist A"],
remixer=["Artist AB", "Artist BC"],
producer=["Artist CD", "Artist DE"],
composer=[], # Composer gets wiped because we're not of the classical genre :-)
djmixer=["New"],
)
assert tf.duration_sec == duration
assert af.duration_sec == duration


@pytest.mark.parametrize(
Expand All @@ -99,22 +99,22 @@ def test_release_type_normalization(isolated_dir: Path, filename: str) -> None:
shutil.copyfile(TEST_TAGGER / filename, fpath)

# Check that release type is read correctly.
tf = AudioFile.from_file(fpath)
assert tf.release_type == "album"
af = AudioFile.from_file(fpath)
assert af.release_type == "album"
# Assert that attempting to flush a stupid value fails.
tf.release_type = "lalala"
af.release_type = "lalala"
with pytest.raises(UnsupportedTagValueTypeError):
tf.flush()
af.flush()
# Flush it anyways...
tf.flush(validate=False)
af.flush(validate=False)
# Check that stupid release type is normalized as unknown.
tf = AudioFile.from_file(fpath)
assert tf.release_type == "unknown"
af = AudioFile.from_file(fpath)
assert af.release_type == "unknown"
# And now assert that the read is case insensitive.
tf.release_type = "ALBUM"
tf.flush(validate=False)
tf = AudioFile.from_file(fpath)
assert tf.release_type == "album"
af.release_type = "ALBUM"
af.flush(validate=False)
af = AudioFile.from_file(fpath)
assert af.release_type == "album"


def test_split_tag() -> None:
Expand Down
50 changes: 33 additions & 17 deletions rose/virtualfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
genre_exists,
get_playlist,
get_release,
get_release_source_path_from_id,
label_exists,
list_artists,
list_collage_releases,
Expand All @@ -63,6 +64,7 @@
list_releases,
release_exists,
track_exists,
update_cache_for_releases,
)
from rose.collages import (
add_release_to_collage,
Expand Down Expand Up @@ -352,7 +354,7 @@ def next(self) -> int:
class RoseLogicalFS:
def __init__(self, config: Config, fhgen: FileHandleGenerator):
self.config = config
# We use this object to determine whether we should show an artist/genre/label
self.fhgen = fhgen
self.can_show = CanShower(config)
# We implement the "add track to playlist" operation in a slightly special way. Unlike
# releases, where the virtual dirname is globally unique, track filenames are not globally
Expand All @@ -373,7 +375,11 @@ def __init__(self, config: Config, fhgen: FileHandleGenerator):
#
# The state is a mapping of fh -> (playlist_name, ext, bytes).
self.playlist_additions_in_progress: dict[int, tuple[str, str, bytearray]] = {}
self.fhgen = fhgen
# We want to trigger a cache update whenever we notice that a file has been updated through
# the virtual filesystem. To do this, we insert the file handle and release ID on open, and
# then trigger the cache update on release. We use this variable to transport that state
# between the two syscalls.
self.update_release_on_fh_close: dict[int, str] = {}
super().__init__()

@staticmethod
Expand Down Expand Up @@ -653,7 +659,8 @@ def mkdir(self, p: VirtualPath) -> None:
except ReleaseDoesNotExistError as e:
err = e
logger.debug(
f"Failed adding release {p.release} to collage {p.collage}: release not found"
f"LOGICAL: Failed adding release {p.release} to collage {p.collage}: "
"release not found"
)
raise llfuse.FUSEError(errno.ENOENT) from err
if p.playlist and p.file is None:
Expand Down Expand Up @@ -735,7 +742,10 @@ def open(self, p: VirtualPath, flags: int) -> int:
return os.open(str(release.cover_image_path), flags)
for track in tracks:
if track.virtual_filename == p.file:
return os.open(str(track.source_path), flags)
fh = os.open(str(track.source_path), flags)
if flags & os.O_WRONLY == os.O_WRONLY or flags & os.O_RDWR == os.O_RDWR:
self.update_release_on_fh_close[fh] = track.release_id
return fh
raise llfuse.FUSEError(err)
if p.playlist and p.file:
try:
Expand All @@ -746,7 +756,9 @@ def open(self, p: VirtualPath, flags: int) -> int:
# operation sequence. See the __init__ for more details.
if flags & os.O_CREAT == os.O_CREAT:
fh = self.fhgen.next()
logger.debug(f"Begin playlist addition operation sequence for {p.file=} and {fh=}")
logger.debug(
f"LOGICAL: Begin playlist addition operation sequence for {p.file=} and {fh=}"
)
self.playlist_additions_in_progress[fh] = (
p.playlist,
Path(p.file).suffix,
Expand All @@ -757,7 +769,10 @@ def open(self, p: VirtualPath, flags: int) -> int:
if p.file_position:
for idx, track in enumerate(tracks):
if track.virtual_filename == p.file and idx + 1 == int(p.file_position):
return os.open(str(track.source_path), flags)
fh = os.open(str(track.source_path), flags)
if flags & os.O_WRONLY == os.O_WRONLY or flags & os.O_RDWR == os.O_RDWR:
self.update_release_on_fh_close[fh] = track.release_id
return fh
if playlist.cover_path and f"cover{playlist.cover_path.suffix}" == p.file:
return os.open(playlist.cover_path, flags)
raise llfuse.FUSEError(err)
Expand All @@ -767,7 +782,7 @@ def open(self, p: VirtualPath, flags: int) -> int:
def read(self, fh: int, offset: int, length: int) -> bytes:
logger.debug(f"LOGICAL: Received read for {fh=} {offset=} {length=}")
if pap := self.playlist_additions_in_progress.get(fh, None):
logger.debug("Matched read to an in-progress playlist addition")
logger.debug("LOGICAL: Matched read to an in-progress playlist addition")
_, _, b = pap
return b[offset : offset + length]
os.lseek(fh, offset, os.SEEK_SET)
Expand All @@ -776,7 +791,7 @@ def read(self, fh: int, offset: int, length: int) -> bytes:
def write(self, fh: int, offset: int, data: bytes) -> int:
logger.debug(f"LOGICAL: Received write for {fh=} {offset=} {len(data)=}")
if pap := self.playlist_additions_in_progress.get(fh, None):
logger.debug("Matched write to an in-progress playlist addition")
logger.debug("LOGICAL: Matched write to an in-progress playlist addition")
_, _, b = pap
del b[offset:]
b.extend(data)
Expand All @@ -785,11 +800,12 @@ def write(self, fh: int, offset: int, data: bytes) -> int:
return os.write(fh, data)

def release(self, fh: int) -> None:
logger.debug(f"LOGICAL: Received release for {fh=}")
if pap := self.playlist_additions_in_progress.get(fh, None):
logger.debug("Matched release to an in-progress playlist addition")
playlist, ext, b = pap
if not b:
logger.debug("Aborting playlist addition release: no bytes to write")
logger.debug("LOGICAL: Aborting playlist addition release: no bytes to write")
return
with tempfile.TemporaryDirectory() as tmpdir:
audiopath = Path(tmpdir) / f"f{ext}"
Expand All @@ -799,20 +815,20 @@ def release(self, fh: int) -> None:
track_id = audiofile.id
if not track_id:
logger.warning(
"Failed to parse track_id from file in playlist addition operation "
"LOGICAL: Failed to parse track_id from file in playlist addition operation "
f"sequence: {track_id=} {fh=} {playlist=} {audiofile}"
)
return
add_track_to_playlist(self.config, playlist, track_id)
del self.playlist_additions_in_progress[fh]
return
logger.debug(f"FUSE: Received release for {fh=}")
os.close(fh)

def ftruncate(self, fh: int, length: int = 0) -> None:
# TODO: IN PROGRESS PLAYLIST ADDITION
logger.debug(f"FUSE: Received ftruncate for {fh=} {length=}")
return os.ftruncate(fh, length)
if release_id := self.update_release_on_fh_close.get(fh, None):
logger.debug(
f"LOGICAL: Triggering cache update for release {release_id} after release syscall"
)
if source_path := get_release_source_path_from_id(self.config, release_id):
update_cache_for_releases(self.config, [source_path])


class INodeManager:
Expand Down Expand Up @@ -950,7 +966,7 @@ def __init__(self, config: Config):
maxsize=9999, ttl=2
)
self.ghost_writable_empty_directory: cachetools.TTLCache[str, bool] = cachetools.TTLCache(
maxsize=9999, ttl=2
maxsize=9999, ttl=5
)

def reset_getattr_caches(self) -> None:
Expand Down
32 changes: 24 additions & 8 deletions rose/virtualfs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest

from rose.config import Config
from rose.tagger import AudioFile
from rose.virtualfs import mount_virtualfs, unmount_virtualfs


Expand Down Expand Up @@ -109,16 +110,30 @@ def can_read(p: Path) -> bool:
assert can_read(root / "8. Playlists" / "Lala Lisa" / "cover.jpg")


@pytest.mark.usefixtures("seeded_cache")
def test_virtual_filesystem_write_files(config: Config) -> None:
def test_virtual_filesystem_write_files(
config: Config,
source_dir: Path, # noqa: ARG001
) -> None:
"""Assert that 1. we can write files and 2. cache updates in response."""
root = config.fuse_mount_dir
path = (
root
/ "1. Releases"
/ "{NEW} BLACKPINK - 1990. I Love Blackpink [K-Pop;Pop]"
/ "01. BLACKPINK - Track 1.m4a"
)
with start_virtual_fs(config):
with (root / "1. Releases" / "r1" / "01. 01.m4a").open("w") as fp:
fp.write("abc")
with (root / "1. Releases" / "r1" / "01. 01.m4a").open("r") as fp:
assert fp.read() == "abc"
with pytest.raises(OSError): # noqa: PT011
(root / "1. Releases" / "r1" / "lalala").open("w")
# Write!
af = AudioFile.from_file(path)
assert af.title == "Track 1"
af.title = "Hahahaha!!"
af.flush()
# Read! File should have been renamed post-cache update.
assert not path.exists()
path = path.with_name("01. BLACKPINK - Hahahaha!!.m4a")
assert path.is_file()
af = AudioFile.from_file(path)
assert af.title == "Hahahaha!!"


@pytest.mark.usefixtures("seeded_cache")
Expand Down Expand Up @@ -207,6 +222,7 @@ def test_virtual_filesystem_playlist_actions(
],
check=True,
)
time.sleep(0.05)
assert (root / "8. Playlists" / "New Jeans" / "1. BLACKPINK - Track 1.m4a").is_file()
with (src / "!playlists" / "New Jeans.toml").open("r") as fp:
assert "BLACKPINK - Track 1.m4a" in fp.read()
Expand Down

0 comments on commit 6c5227e

Please sign in to comment.