Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pathlib instead of beets's path handling #96

Merged
merged 1 commit into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Change Log
==========

## Upcoming
* Consistently use Unicode paths to alternative items. This may result and
collection updates and orphaned files in alternatives. It may also improve
usability on non-standard file systems (see [#74]).

[#74]: https://github.com/geigerzaehler/beets-alternatives/issues/74

## v0.12.0 - 2024-06-25
* Fix an issue where items in a symlink collection with relative links were
always unnecessarily updated.
Expand Down
184 changes: 77 additions & 107 deletions beetsplug/alternatives.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,24 @@
import argparse
import logging
import os.path
import shutil
import threading
import traceback
from concurrent import futures
from enum import Enum
from typing import Callable, Iterable, Iterator, Optional, Sequence, Set, Tuple, cast
from pathlib import Path
from typing import Callable, Iterable, Iterator, Optional, Sequence, Set, Tuple

import beets
import confuse
from beets import art, util
from beets.library import Item, Library, parse_query_string
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, UserError, decargs, get_path_formats, input_yn, print_
from beets.util import FilesystemError, bytestring_path, displayable_path, syspath
from typing_extensions import Never, override

import beetsplug.convert as convert


def _remove(path_: bytes, soft: bool = True):
"""Remove the file. If `soft`, then no error will be raised if the
file does not exist.
In contrast to beets' util.remove, this uses lexists such that it can
actually remove symlink links.
"""
path = syspath(path_)
if soft and not os.path.lexists(path):
return
try:
os.remove(path)
except OSError as exc:
raise FilesystemError(exc, "delete", (path,), traceback.format_exc()) from exc


class AlternativesPlugin(BeetsPlugin):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -198,52 +183,49 @@ def parse_config(self, config: confuse.ConfigView):
self.removable = config.get(dict).get("removable", True) # type: ignore

if "directory" in config:
dir = config["directory"].as_str()
assert isinstance(dir, str)
dir = config["directory"].as_path()
assert isinstance(dir, Path)
else:
dir = self.name
dir = bytestring_path(dir)
if not os.path.isabs(syspath(dir)):
dir = os.path.join(self.lib.directory, dir)
dir = Path(self.name)
if not dir.is_absolute():
dir = Path(str(self.lib.directory, "utf8")) / dir # type: ignore
self.directory = dir

def item_change_actions(
self, item: Item, path: bytes, dest: bytes
self, item: Item, actual: Path, dest: Path
) -> Sequence[Action]:
"""Returns the necessary actions for items that were previously in the
external collection, but might require metadata updates.
"""
actions = []

if not util.samefile(path, dest):
if actual != dest:
actions.append(Action.MOVE)

item_mtime_alt = os.path.getmtime(syspath(path))
if item_mtime_alt < os.path.getmtime(syspath(item.path)):
item_mtime_alt = actual.stat().st_mtime
if item_mtime_alt < Path(str(item.path, "utf8")).stat().st_mtime:
actions.append(Action.WRITE)
album = item.get_album()

if (
album
and album.artpath
and os.path.isfile(syspath(album.artpath))
and (item_mtime_alt < os.path.getmtime(syspath(album.artpath)))
and Path(str(album.artpath, "utf8")).is_file()
and (item_mtime_alt < Path(str(album.artpath, "utf8")).stat().st_mtime)
):
actions.append(Action.SYNC_ART)

return actions

def _matched_item_action(self, item: Item) -> Sequence[Action]:
path = self._get_stored_path(item)
if path and os.path.lexists(syspath(path)):
actual = self._get_stored_path(item)
if actual and (actual.is_file() or actual.is_symlink()):
dest = self.destination(item)
_, path_ext = os.path.splitext(path)
_, dest_ext = os.path.splitext(dest)
if path_ext != dest_ext:
if actual.suffix == dest.suffix:
return self.item_change_actions(item, actual, dest)
else:
# formats config option changed
return [Action.REMOVE, Action.ADD]
else:
return self.item_change_actions(item, path, dest)
else:
return [Action.ADD]

Expand All @@ -267,15 +249,15 @@ def ask_create(self, create: Optional[bool] = None) -> bool:
return create

msg = (
f"Collection at '{displayable_path(self.directory)}' does not exists. "
f"Collection at '{self.directory}' does not exists. "
"Maybe you forgot to mount it.\n"
"Do you want to create the collection? (y/n)"
)
return input_yn(msg, require=True)

def update(self, create: Optional[bool] = None):
if not os.path.isdir(syspath(self.directory)) and not self.ask_create(create):
print_(f"Skipping creation of {displayable_path(self.directory)}")
if not self.directory.is_dir() and not self.ask_create(create):
print_(f"Skipping creation of {self.directory}")
return

converter = self._converter()
Expand All @@ -285,32 +267,27 @@ def update(self, create: Optional[bool] = None):
for action in actions:
if action == Action.MOVE:
assert path is not None # action guarantees that `path` is not none
print_(f">{displayable_path(path)} -> {displayable_path(dest)}")
util.mkdirall(dest)
util.move(path, dest)
util.prune_dirs(
# Although the types for `prune_dirs()` require a `str`
# argument the function accepts a `bytes` argument.
cast(str, os.path.dirname(path)),
root=self.directory,
)
print_(f">{path} -> {dest}")
dest.parent.mkdir(parents=True, exist_ok=True)
path.rename(dest)
util.prune_dirs(str(path.parent), root=str(self.directory))
self._set_stored_path(item, dest)
item.store()
path = dest
elif action == Action.WRITE:
assert path is not None # action guarantees that `path` is not none
print_(f"*{displayable_path(path)}")
item.write(path=path)
print_(f"*{path}")
item.write(path=bytes(path))
elif action == Action.SYNC_ART:
assert path is not None # action guarantees that `path` is not none
print_(f"~{displayable_path(path)}")
print_(f"~{path}")
assert path is not None
self._sync_art(item, path)
elif action == Action.ADD:
print_(f"+{displayable_path(dest)}")
print_(f"+{dest}")
converter.run(item)
elif action == Action.REMOVE:
assert path is not None # action guarantees that `path` is not none
print_(f"-{displayable_path(path)}")
print_(f"-{path}")
self._remove_file(item)
item.store()

Expand All @@ -319,55 +296,51 @@ def update(self, create: Optional[bool] = None):
item.store()
converter.shutdown()

def destination(self, item: Item) -> bytes:
def destination(self, item: Item) -> Path:
"""Returns the path for `item` in the external collection."""
path = item.destination(basedir=self.directory, path_formats=self.path_formats)
assert isinstance(path, bytes)
return path
path = item.destination(path_formats=self.path_formats, fragment=True)
# When using `fragment=True` the returned path is guaranteed to be a
# string.
assert isinstance(path, str)
return self.directory / path

def _set_stored_path(self, item: Item, path: bytes):
item[self.path_key] = str(path, "utf8")
def _set_stored_path(self, item: Item, path: Path):
item[self.path_key] = str(path)

def _get_stored_path(self, item: Item) -> Optional[bytes]:
def _get_stored_path(self, item: Item) -> Optional[Path]:
try:
path = item[self.path_key]
except KeyError:
return None
if path:
return path.encode("utf8")

if isinstance(path, str):
return Path(path)
else:
return None

def _remove_file(self, item: Item):
"""Remove the external file for `item`."""
path = self._get_stored_path(item)
assert path, "File to remove does not have a path"
_remove(path)
util.prune_dirs(
# Although the types for `prune_dirs()` require a `str`
# argument the function accepts a `bytes` argument.
cast(str, path),
root=self.directory,
)
if path:
path.unlink(missing_ok=True)
util.prune_dirs(str(path), root=str(self.directory))
del item[self.path_key]

def _converter(self) -> "Worker":
def _convert(item: Item):
dest = self.destination(item)
util.mkdirall(dest)
util.copy(item.path, dest, replace=True)
dest.parent.mkdir(exist_ok=True, parents=True)
shutil.copyfile(item.path, dest)
return item, dest

return Worker(_convert, self.max_workers)

def _sync_art(self, item: Item, path: bytes):
def _sync_art(self, item: Item, path: Path):
"""Embed artwork in the file at `path`."""
album = item.get_album()
if album and album.artpath and os.path.isfile(syspath(album.artpath)):
self._log.debug(
f"Embedding art from {displayable_path(album.artpath)} into {displayable_path(path)}"
)
art.embed_item(self._log, item, album.artpath, itempath=path)
if album and album.artpath and Path(str(album.artpath, "utf8")).is_file():
self._log.debug(f"Embedding art from {album.artpath} into {path}")
art.embed_item(self._log, item, album.artpath, itempath=bytes(path))


class ExternalConvert(External):
Expand All @@ -394,26 +367,26 @@ def _converter(self) -> "Worker":
def _convert(item: Item):
dest = self.destination(item)
with fs_lock:
util.mkdirall(dest)
dest.parent.mkdir(exist_ok=True, parents=True)

if self._should_transcode(item):
self._encode(self.convert_cmd, item.path, dest)
self._encode(self.convert_cmd, item.path, bytes(dest))
# Don't rely on the converter to write correct/complete tags.
item.write(path=dest)
item.write(path=bytes(dest))
else:
self._log.debug(f"copying {displayable_path(dest)}")
util.copy(item.path, dest, replace=True)
self._log.debug(f"copying {dest}")
shutil.copyfile(item.path, dest)
if self._embed:
self._sync_art(item, dest)
return item, dest

return Worker(_convert, self.max_workers)

@override
def destination(self, item: Item) -> bytes:
def destination(self, item: Item) -> Path:
dest = super().destination(item)
if self._should_transcode(item):
return os.path.splitext(dest)[0] + b"." + self.ext
return dest.with_suffix("." + self.ext.decode("utf8"))
else:
return dest

Expand Down Expand Up @@ -444,21 +417,17 @@ def parse_config(self, config: confuse.ConfigView):

@override
def item_change_actions(
self, item: Item, path: bytes, dest: bytes
self, item: Item, actual: Path, dest: Path
) -> Sequence[Action]:
"""Returns the necessary actions for items that were previously in the
external collection, but might require metadata updates.
"""

if path != dest:
return [Action.MOVE]

try:
link_target_correct = os.path.samefile(path, item.path)
except FileNotFoundError:
link_target_correct = False

if link_target_correct:
if (
actual == dest
and actual.is_file() # Symlink not broken, `.samefile()` doesn’t throw
and actual.samefile(Path(str(item.path, "utf8")))
):
return []
else:
return [Action.MOVE]
Expand All @@ -471,43 +440,44 @@ def update(self, create: Optional[bool] = None):
for action in actions:
if action == Action.MOVE:
assert path is not None # action guarantees that `path` is not none
print_(f">{displayable_path(path)} -> {displayable_path(dest)}")
print_(f">{path} -> {dest}")
self._remove_file(item)
self._create_symlink(item)
self._set_stored_path(item, dest)
elif action == Action.ADD:
print_(f"+{displayable_path(dest)}")
print_(f"+{dest}")
self._create_symlink(item)
self._set_stored_path(item, dest)
elif action == Action.REMOVE:
assert path is not None # action guarantees that `path` is not none
print_(f"-{displayable_path(path)}")
print_(f"-{path}")
self._remove_file(item)
else:
continue
item.store()

def _create_symlink(self, item: Item):
dest = self.destination(item)
util.mkdirall(dest)
dest.parent.mkdir(exist_ok=True, parents=True)
item_path = Path(str(item.path, "utf8"))
link = (
os.path.relpath(item.path, os.path.dirname(dest))
os.path.relpath(item_path, dest.parent)
if self.relativelinks == SymlinkType.RELATIVE
else item.path
else item_path
)
util.link(link, dest)
dest.symlink_to(link)

@override
def _sync_art(self, item: Item, path: bytes):
def _sync_art(self, item: Item, path: Path):
pass


class Worker(futures.ThreadPoolExecutor):
def __init__(
self, fn: Callable[[Item], Tuple[Item, bytes]], max_workers: Optional[int]
self, fn: Callable[[Item], Tuple[Item, Path]], max_workers: Optional[int]
):
super().__init__(max_workers)
self._tasks: Set[futures.Future[Tuple[Item, bytes]]] = set()
self._tasks: Set[futures.Future[Tuple[Item, Path]]] = set()
self._fn = fn

def run(self, item: Item):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ preview = true
extend-select = [
"I", # Sort imports
"C", # Pyflakes conventions
"PTH", # Use pathlib instead of os
"PIE", # Misc. lints
"UP", # Enforce modern Python syntax
"FURB", # Also enforce more modern Python syntax
Expand Down
Loading
Loading