Skip to content

Commit

Permalink
factor a virtual path parser out of virtualfs
Browse files Browse the repository at this point in the history
  • Loading branch information
azuline committed Oct 11, 2023
1 parent ce2452f commit 303199b
Showing 1 changed file with 126 additions and 170 deletions.
296 changes: 126 additions & 170 deletions rose/virtualfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import stat
import subprocess
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import IO, Any, Literal
from typing import Any, Literal

import fuse

Expand Down Expand Up @@ -36,210 +37,165 @@ def __init__(self, config: Config):

def getattr(self, path: str) -> fuse.Stat:
logger.debug(f"Received getattr for {path}")
p = parse_virtual_path(path)
logger.debug(f"Parsed getattr path as {p}")

def mkstat(mode: Literal["dir", "file"], fsize: int = 4096) -> fuse.Stat:
def mkstat(mode: Literal["dir", "file"], file: Path | None = None) -> fuse.Stat:
return fuse.Stat(
st_nlink=1,
st_mode=(stat.S_IFDIR | 0o755) if mode == "dir" else (stat.S_IFREG | 0o644),
st_size=fsize,
st_size=file.stat().st_size if file else 4096,
st_uid=os.getuid(),
st_gid=os.getgid(),
)

if path == "/":
if p.view == "root":
return mkstat("dir")

parts = path.split("/")[1:] # First part is always empty string.

if parts[0] == "albums":
if len(parts) == 1:
return mkstat("dir")
if not release_exists(self.config, parts[1]):
raise OSError(errno.ENOENT, "No such file or directory")
if len(parts) == 2:
return mkstat("dir")
if len(parts) == 3 and (tp := track_exists(self.config, parts[1], parts[2])):
return mkstat("file", tp.stat().st_size)
raise OSError(errno.ENOENT, "No such file or directory")

if parts[0] == "artists":
if len(parts) == 1:
return mkstat("dir")
if not artist_exists(self.config, parts[1]):
raise OSError(errno.ENOENT, "No such file or directory")
if len(parts) == 2:
return mkstat("dir")
if not release_exists(self.config, parts[2]):
raise OSError(errno.ENOENT, "No such file or directory")
if len(parts) == 3:
elif p.album and p.track:
if tp := track_exists(self.config, p.album, p.track):
return mkstat("file", tp)
elif p.album:
if rp := release_exists(self.config, p.album):
return mkstat("dir", rp)
elif p.artist:
if artist_exists(self.config, p.artist):
return mkstat("dir")
if len(parts) == 4 and (tp := track_exists(self.config, parts[2], parts[3])):
return mkstat("file", tp.stat().st_size)
raise OSError(errno.ENOENT, "No such file or directory")

if parts[0] == "genres":
if len(parts) == 1:
elif p.genre:
if genre_exists(self.config, p.genre):
return mkstat("dir")
if not genre_exists(self.config, parts[1]):
raise OSError(errno.ENOENT, "No such file or directory")
if len(parts) == 2:
elif p.label:
if label_exists(self.config, p.label):
return mkstat("dir")
if not release_exists(self.config, parts[2]):
raise OSError(errno.ENOENT, "No such file or directory")
if len(parts) == 3:
return mkstat("dir")
if len(parts) == 4 and (tp := track_exists(self.config, parts[2], parts[3])):
return mkstat("file", tp.stat().st_size)
raise OSError(errno.ENOENT, "No such file or directory")

if parts[0] == "labels":
if len(parts) == 1:
return mkstat("dir")
if not label_exists(self.config, parts[1]):
raise OSError(errno.ENOENT, "No such file or directory")
if len(parts) == 2:
return mkstat("dir")
if not release_exists(self.config, parts[2]):
raise OSError(errno.ENOENT, "No such file or directory")
if len(parts) == 3:
return mkstat("dir")
if len(parts) == 4 and (tp := track_exists(self.config, parts[2], parts[3])):
return mkstat("file", tp.stat().st_size)
raise OSError(errno.ENOENT, "No such file or directory")
else:
return mkstat("dir")

raise OSError(errno.ENOENT, "No such file or directory")

def readdir(self, path: str, _: Any) -> Iterator[fuse.Direntry]:
logger.debug(f"Received readdir for {path}")
if path == "/":
p = parse_virtual_path(path)
logger.debug(f"Parsed readdir path as {p}")

yield from [fuse.Direntry("."), fuse.Direntry("..")]

if p.view == "root":
yield from [
fuse.Direntry("."),
fuse.Direntry(".."),
fuse.Direntry("albums"),
fuse.Direntry("artists"),
fuse.Direntry("genres"),
fuse.Direntry("labels"),
]
return

parts = path.split("/")[1:] # First part is always empty string.

if parts[0] == "albums":
if len(parts) == 1:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for album in list_releases(self.config):
yield fuse.Direntry(album.virtual_dirname)
return
if len(parts) == 2:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for track in list_tracks(self.config, parts[1]):
yield fuse.Direntry(track.virtual_filename)
return
return

if parts[0] == "artists":
if len(parts) == 1:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for artist in list_artists(self.config):
yield fuse.Direntry(sanitize_filename(artist))
return
if len(parts) == 2:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for album in list_releases(self.config, sanitized_artist_filter=parts[1]):
yield fuse.Direntry(album.virtual_dirname)
return
if len(parts) == 3:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for track in list_tracks(self.config, parts[2]):
yield fuse.Direntry(track.virtual_filename)
return
return

if parts[0] == "genres":
if len(parts) == 1:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for genre in list_genres(self.config):
yield fuse.Direntry(sanitize_filename(genre))
return
if len(parts) == 2:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for album in list_releases(self.config, sanitized_genre_filter=parts[1]):
yield fuse.Direntry(album.virtual_dirname)
return
if len(parts) == 3:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for track in list_tracks(self.config, parts[2]):
yield fuse.Direntry(track.virtual_filename)
return
return

if parts[0] == "labels":
if len(parts) == 1:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for label in list_labels(self.config):
yield fuse.Direntry(sanitize_filename(label))
return
if len(parts) == 2:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for album in list_releases(self.config, sanitized_label_filter=parts[1]):
yield fuse.Direntry(album.virtual_dirname)
return
if len(parts) == 3:
yield from [fuse.Direntry("."), fuse.Direntry("..")]
for track in list_tracks(self.config, parts[2]):
yield fuse.Direntry(track.virtual_filename)
return
return
elif p.album:
for track in list_tracks(self.config, p.album):
yield fuse.Direntry(track.virtual_filename)
elif p.artist or p.genre or p.label or p.view == "albums":
for album in list_releases(
self.config,
sanitized_artist_filter=p.artist,
sanitized_genre_filter=p.genre,
sanitized_label_filter=p.label,
):
yield fuse.Direntry(album.virtual_dirname)
elif p.view == "artists":
for artist in list_artists(self.config):
yield fuse.Direntry(sanitize_filename(artist))
elif p.view == "genres":
for genre in list_genres(self.config):
yield fuse.Direntry(sanitize_filename(genre))
elif p.view == "labels":
for label in list_labels(self.config):
yield fuse.Direntry(sanitize_filename(label))
else:
raise OSError(errno.ENOENT, "No such file or directory")

def read(self, path: str, size: int, offset: int) -> bytes:
logger.debug(f"Received read for {path=} {size=} {offset=}")
p = parse_virtual_path(path)
logger.debug(f"Parsed read path as {p}")

if p.album and p.track:
for track in list_tracks(self.config, p.album):
if track.virtual_filename == p.track:
with track.source_path.open("rb") as fp:
fp.seek(offset)
return fp.read(size)

raise OSError(errno.ENOENT, "No such file or directory")

def read(self, path: str, size: int, offset: int) -> bytes:
logger.debug(f"Received read for {path}")
def open(self, path: str, flags: int) -> None:
logger.debug(f"Received open for {path=} {flags=}")

def read_bytes(p: Path) -> bytes:
with p.open("rb") as fp:
fp.seek(offset)
return fp.read(size)
# Raise an ENOENT if the file does not exist.
self.getattr(path)

parts = path.split("/")[1:] # First part is always empty string.
# Read-only file system.
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
if (flags & accmode) != os.O_RDONLY:
raise OSError(errno.EACCES, "Access denied")

if parts[0] == "albums":
if len(parts) != 3:
raise OSError(errno.ENOENT, "No such file or directory")
for track in list_tracks(self.config, parts[1]):
if track.virtual_filename == parts[2]:
return read_bytes(track.source_path)
raise OSError(errno.ENOENT, "No such file or directory")
if parts[0] in ["artists", "genres", "labels"]:
if len(parts) != 4:
raise OSError(errno.ENOENT, "No such file or directory")
for track in list_tracks(self.config, parts[2]):
if track.virtual_filename == parts[3]:
return read_bytes(track.source_path)
raise OSError(errno.ENOENT, "No such file or directory")
return None


@dataclass
class ParsedPath:
view: Literal["root", "albums", "artists", "genres", "labels"] | None
artist: str | None = None
genre: str | None = None
label: str | None = None
album: str | None = None
track: str | None = None


def parse_virtual_path(path: str) -> ParsedPath:
parts = path.split("/")[1:] # First part is always empty string.

if len(parts) == 1 and parts[0] == "":
return ParsedPath(view="root")

if parts[0] == "albums":
if len(parts) == 1:
return ParsedPath(view="albums")
if len(parts) == 2:
return ParsedPath(view="albums", album=parts[1])
if len(parts) == 3:
return ParsedPath(view="albums", album=parts[1], track=parts[2])
raise OSError(errno.ENOENT, "No such file or directory")

def open(self, path: str, flags: str) -> IO[Any]:
logger.debug(f"Received open for {path}")
if parts[0] == "artists":
if len(parts) == 1:
return ParsedPath(view="artists")
if len(parts) == 2:
return ParsedPath(view="artists", artist=parts[1])
if len(parts) == 3:
return ParsedPath(view="artists", artist=parts[1], album=parts[2])
if len(parts) == 4:
return ParsedPath(view="artists", artist=parts[1], album=parts[2], track=parts[3])
raise OSError(errno.ENOENT, "No such file or directory")

parts = path.split("/")[1:] # First part is always empty string.
if parts[0] == "genres":
if len(parts) == 1:
return ParsedPath(view="genres")
if len(parts) == 2:
return ParsedPath(view="genres", genre=parts[1])
if len(parts) == 3:
return ParsedPath(view="genres", genre=parts[1], album=parts[2])
if len(parts) == 4:
return ParsedPath(view="genres", genre=parts[1], album=parts[2], track=parts[3])
raise OSError(errno.ENOENT, "No such file or directory")

if parts[0] == "albums":
if len(parts) != 3:
raise OSError(errno.ENOENT, "No such file or directory")
for track in list_tracks(self.config, parts[1]):
if track.virtual_filename == parts[2]:
return track.source_path.open(flags)
raise OSError(errno.ENOENT, "No such file or directory")
if parts[0] in ["artists", "genres", "labels"]:
if len(parts) != 4:
raise OSError(errno.ENOENT, "No such file or directory")
for track in list_tracks(self.config, parts[2]):
if track.virtual_filename == parts[3]:
return track.source_path.open(flags)
raise OSError(errno.ENOENT, "No such file or directory")
if parts[0] == "labels":
if len(parts) == 1:
return ParsedPath(view="labels")
if len(parts) == 2:
return ParsedPath(view="labels", label=parts[1])
if len(parts) == 3:
return ParsedPath(view="labels", label=parts[1], album=parts[2])
if len(parts) == 4:
return ParsedPath(view="labels", label=parts[1], album=parts[2], track=parts[3])
raise OSError(errno.ENOENT, "No such file or directory")

raise OSError(errno.ENOENT, "No such file or directory")


def mount_virtualfs(c: Config, mount_args: list[str]) -> None:
server = VirtualFS(c)
Expand Down

0 comments on commit 303199b

Please sign in to comment.