From 303199b841e73491dffb345269dfc8e387f6dce6 Mon Sep 17 00:00:00 2001 From: blissful Date: Wed, 11 Oct 2023 11:45:16 -0400 Subject: [PATCH] factor a virtual path parser out of virtualfs --- rose/virtualfs/__init__.py | 296 ++++++++++++++++--------------------- 1 file changed, 126 insertions(+), 170 deletions(-) diff --git a/rose/virtualfs/__init__.py b/rose/virtualfs/__init__.py index 211c23a..81a4c8f 100644 --- a/rose/virtualfs/__init__.py +++ b/rose/virtualfs/__init__.py @@ -4,8 +4,9 @@ import stat import subprocess from collections.abc import Iterator +from dataclasses import dataclass from pathlib import Path -from typing import IO, Any, Literal +from typing import Any, Literal import fuse @@ -36,210 +37,165 @@ def __init__(self, config: Config): def getattr(self, path: str) -> fuse.Stat: logger.debug(f"Received getattr for {path}") + p = parse_virtual_path(path) + logger.debug(f"Parsed getattr path as {p}") - def mkstat(mode: Literal["dir", "file"], fsize: int = 4096) -> fuse.Stat: + def mkstat(mode: Literal["dir", "file"], file: Path | None = None) -> fuse.Stat: return fuse.Stat( st_nlink=1, st_mode=(stat.S_IFDIR | 0o755) if mode == "dir" else (stat.S_IFREG | 0o644), - st_size=fsize, + st_size=file.stat().st_size if file else 4096, st_uid=os.getuid(), st_gid=os.getgid(), ) - if path == "/": + if p.view == "root": return mkstat("dir") - - parts = path.split("/")[1:] # First part is always empty string. - - if parts[0] == "albums": - if len(parts) == 1: - return mkstat("dir") - if not release_exists(self.config, parts[1]): - raise OSError(errno.ENOENT, "No such file or directory") - if len(parts) == 2: - return mkstat("dir") - if len(parts) == 3 and (tp := track_exists(self.config, parts[1], parts[2])): - return mkstat("file", tp.stat().st_size) - raise OSError(errno.ENOENT, "No such file or directory") - - if parts[0] == "artists": - if len(parts) == 1: - return mkstat("dir") - if not artist_exists(self.config, parts[1]): - raise OSError(errno.ENOENT, "No such file or directory") - if len(parts) == 2: - return mkstat("dir") - if not release_exists(self.config, parts[2]): - raise OSError(errno.ENOENT, "No such file or directory") - if len(parts) == 3: + elif p.album and p.track: + if tp := track_exists(self.config, p.album, p.track): + return mkstat("file", tp) + elif p.album: + if rp := release_exists(self.config, p.album): + return mkstat("dir", rp) + elif p.artist: + if artist_exists(self.config, p.artist): return mkstat("dir") - if len(parts) == 4 and (tp := track_exists(self.config, parts[2], parts[3])): - return mkstat("file", tp.stat().st_size) - raise OSError(errno.ENOENT, "No such file or directory") - - if parts[0] == "genres": - if len(parts) == 1: + elif p.genre: + if genre_exists(self.config, p.genre): return mkstat("dir") - if not genre_exists(self.config, parts[1]): - raise OSError(errno.ENOENT, "No such file or directory") - if len(parts) == 2: + elif p.label: + if label_exists(self.config, p.label): return mkstat("dir") - if not release_exists(self.config, parts[2]): - raise OSError(errno.ENOENT, "No such file or directory") - if len(parts) == 3: - return mkstat("dir") - if len(parts) == 4 and (tp := track_exists(self.config, parts[2], parts[3])): - return mkstat("file", tp.stat().st_size) - raise OSError(errno.ENOENT, "No such file or directory") - - if parts[0] == "labels": - if len(parts) == 1: - return mkstat("dir") - if not label_exists(self.config, parts[1]): - raise OSError(errno.ENOENT, "No such file or directory") - if len(parts) == 2: - return mkstat("dir") - if not release_exists(self.config, parts[2]): - raise OSError(errno.ENOENT, "No such file or directory") - if len(parts) == 3: - return mkstat("dir") - if len(parts) == 4 and (tp := track_exists(self.config, parts[2], parts[3])): - return mkstat("file", tp.stat().st_size) - raise OSError(errno.ENOENT, "No such file or directory") + else: + return mkstat("dir") raise OSError(errno.ENOENT, "No such file or directory") def readdir(self, path: str, _: Any) -> Iterator[fuse.Direntry]: logger.debug(f"Received readdir for {path}") - if path == "/": + p = parse_virtual_path(path) + logger.debug(f"Parsed readdir path as {p}") + + yield from [fuse.Direntry("."), fuse.Direntry("..")] + + if p.view == "root": yield from [ - fuse.Direntry("."), - fuse.Direntry(".."), fuse.Direntry("albums"), fuse.Direntry("artists"), fuse.Direntry("genres"), fuse.Direntry("labels"), ] - return - - parts = path.split("/")[1:] # First part is always empty string. - - if parts[0] == "albums": - if len(parts) == 1: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for album in list_releases(self.config): - yield fuse.Direntry(album.virtual_dirname) - return - if len(parts) == 2: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for track in list_tracks(self.config, parts[1]): - yield fuse.Direntry(track.virtual_filename) - return - return - - if parts[0] == "artists": - if len(parts) == 1: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for artist in list_artists(self.config): - yield fuse.Direntry(sanitize_filename(artist)) - return - if len(parts) == 2: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for album in list_releases(self.config, sanitized_artist_filter=parts[1]): - yield fuse.Direntry(album.virtual_dirname) - return - if len(parts) == 3: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for track in list_tracks(self.config, parts[2]): - yield fuse.Direntry(track.virtual_filename) - return - return - - if parts[0] == "genres": - if len(parts) == 1: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for genre in list_genres(self.config): - yield fuse.Direntry(sanitize_filename(genre)) - return - if len(parts) == 2: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for album in list_releases(self.config, sanitized_genre_filter=parts[1]): - yield fuse.Direntry(album.virtual_dirname) - return - if len(parts) == 3: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for track in list_tracks(self.config, parts[2]): - yield fuse.Direntry(track.virtual_filename) - return - return - - if parts[0] == "labels": - if len(parts) == 1: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for label in list_labels(self.config): - yield fuse.Direntry(sanitize_filename(label)) - return - if len(parts) == 2: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for album in list_releases(self.config, sanitized_label_filter=parts[1]): - yield fuse.Direntry(album.virtual_dirname) - return - if len(parts) == 3: - yield from [fuse.Direntry("."), fuse.Direntry("..")] - for track in list_tracks(self.config, parts[2]): - yield fuse.Direntry(track.virtual_filename) - return - return + elif p.album: + for track in list_tracks(self.config, p.album): + yield fuse.Direntry(track.virtual_filename) + elif p.artist or p.genre or p.label or p.view == "albums": + for album in list_releases( + self.config, + sanitized_artist_filter=p.artist, + sanitized_genre_filter=p.genre, + sanitized_label_filter=p.label, + ): + yield fuse.Direntry(album.virtual_dirname) + elif p.view == "artists": + for artist in list_artists(self.config): + yield fuse.Direntry(sanitize_filename(artist)) + elif p.view == "genres": + for genre in list_genres(self.config): + yield fuse.Direntry(sanitize_filename(genre)) + elif p.view == "labels": + for label in list_labels(self.config): + yield fuse.Direntry(sanitize_filename(label)) + else: + raise OSError(errno.ENOENT, "No such file or directory") + + def read(self, path: str, size: int, offset: int) -> bytes: + logger.debug(f"Received read for {path=} {size=} {offset=}") + p = parse_virtual_path(path) + logger.debug(f"Parsed read path as {p}") + + if p.album and p.track: + for track in list_tracks(self.config, p.album): + if track.virtual_filename == p.track: + with track.source_path.open("rb") as fp: + fp.seek(offset) + return fp.read(size) raise OSError(errno.ENOENT, "No such file or directory") - def read(self, path: str, size: int, offset: int) -> bytes: - logger.debug(f"Received read for {path}") + def open(self, path: str, flags: int) -> None: + logger.debug(f"Received open for {path=} {flags=}") - def read_bytes(p: Path) -> bytes: - with p.open("rb") as fp: - fp.seek(offset) - return fp.read(size) + # Raise an ENOENT if the file does not exist. + self.getattr(path) - parts = path.split("/")[1:] # First part is always empty string. + # Read-only file system. + accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR + if (flags & accmode) != os.O_RDONLY: + raise OSError(errno.EACCES, "Access denied") - if parts[0] == "albums": - if len(parts) != 3: - raise OSError(errno.ENOENT, "No such file or directory") - for track in list_tracks(self.config, parts[1]): - if track.virtual_filename == parts[2]: - return read_bytes(track.source_path) - raise OSError(errno.ENOENT, "No such file or directory") - if parts[0] in ["artists", "genres", "labels"]: - if len(parts) != 4: - raise OSError(errno.ENOENT, "No such file or directory") - for track in list_tracks(self.config, parts[2]): - if track.virtual_filename == parts[3]: - return read_bytes(track.source_path) - raise OSError(errno.ENOENT, "No such file or directory") + return None + + +@dataclass +class ParsedPath: + view: Literal["root", "albums", "artists", "genres", "labels"] | None + artist: str | None = None + genre: str | None = None + label: str | None = None + album: str | None = None + track: str | None = None + + +def parse_virtual_path(path: str) -> ParsedPath: + parts = path.split("/")[1:] # First part is always empty string. + + if len(parts) == 1 and parts[0] == "": + return ParsedPath(view="root") + + if parts[0] == "albums": + if len(parts) == 1: + return ParsedPath(view="albums") + if len(parts) == 2: + return ParsedPath(view="albums", album=parts[1]) + if len(parts) == 3: + return ParsedPath(view="albums", album=parts[1], track=parts[2]) raise OSError(errno.ENOENT, "No such file or directory") - def open(self, path: str, flags: str) -> IO[Any]: - logger.debug(f"Received open for {path}") + if parts[0] == "artists": + if len(parts) == 1: + return ParsedPath(view="artists") + if len(parts) == 2: + return ParsedPath(view="artists", artist=parts[1]) + if len(parts) == 3: + return ParsedPath(view="artists", artist=parts[1], album=parts[2]) + if len(parts) == 4: + return ParsedPath(view="artists", artist=parts[1], album=parts[2], track=parts[3]) + raise OSError(errno.ENOENT, "No such file or directory") - parts = path.split("/")[1:] # First part is always empty string. + if parts[0] == "genres": + if len(parts) == 1: + return ParsedPath(view="genres") + if len(parts) == 2: + return ParsedPath(view="genres", genre=parts[1]) + if len(parts) == 3: + return ParsedPath(view="genres", genre=parts[1], album=parts[2]) + if len(parts) == 4: + return ParsedPath(view="genres", genre=parts[1], album=parts[2], track=parts[3]) + raise OSError(errno.ENOENT, "No such file or directory") - if parts[0] == "albums": - if len(parts) != 3: - raise OSError(errno.ENOENT, "No such file or directory") - for track in list_tracks(self.config, parts[1]): - if track.virtual_filename == parts[2]: - return track.source_path.open(flags) - raise OSError(errno.ENOENT, "No such file or directory") - if parts[0] in ["artists", "genres", "labels"]: - if len(parts) != 4: - raise OSError(errno.ENOENT, "No such file or directory") - for track in list_tracks(self.config, parts[2]): - if track.virtual_filename == parts[3]: - return track.source_path.open(flags) - raise OSError(errno.ENOENT, "No such file or directory") + if parts[0] == "labels": + if len(parts) == 1: + return ParsedPath(view="labels") + if len(parts) == 2: + return ParsedPath(view="labels", label=parts[1]) + if len(parts) == 3: + return ParsedPath(view="labels", label=parts[1], album=parts[2]) + if len(parts) == 4: + return ParsedPath(view="labels", label=parts[1], album=parts[2], track=parts[3]) raise OSError(errno.ENOENT, "No such file or directory") + raise OSError(errno.ENOENT, "No such file or directory") + def mount_virtualfs(c: Config, mount_args: list[str]) -> None: server = VirtualFS(c)