Skip to content

Commit

Permalink
add rule parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
azuline committed Nov 1, 2023
1 parent e89814d commit 53a8c21
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 147 deletions.
1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def config(isolated_dir: Path) -> Config:
cover_art_stems=["cover", "folder", "art", "front"],
valid_art_exts=["jpg", "jpeg", "png"],
ignore_release_directories=[],
stored_metadata_rules=[],
hash="00ff",
)

Expand Down
19 changes: 18 additions & 1 deletion rose/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tomllib

from rose.common import RoseError
from rose.rule_parser import InvalidRuleSpecError, MetadataRule

XDG_CONFIG_ROSE = Path(appdirs.user_config_dir("rose"))
XDG_CONFIG_ROSE.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -66,6 +67,8 @@ class Config:

ignore_release_directories: list[str]

stored_metadata_rules: list[MetadataRule]

hash: str

@classmethod
Expand All @@ -79,7 +82,9 @@ def parse(cls, config_path_override: Path | None = None) -> Config:
except FileNotFoundError as e:
raise ConfigNotFoundError(f"Configuration file not found ({cfgpath})") from e
except tomllib.TOMLDecodeError as e:
raise ConfigDecodeError("Failed to decode configuration file: invalid TOML") from e
raise ConfigDecodeError(
f"Failed to decode configuration file: invalid TOML: {e}"
) from e

try:
music_source_dir = Path(data["music_source_dir"]).expanduser()
Expand Down Expand Up @@ -315,6 +320,17 @@ def parse(cls, config_path_override: Path | None = None) -> Config:
"must be a list of strings"
) from e

stored_metadata_rules: list[MetadataRule] = []
d = None
try:
for d in data.get("stored_metadata_rules", []):
stored_metadata_rules.append(MetadataRule.parse_dict(d))
except InvalidRuleSpecError as e:
raise InvalidConfigValueError(
f"Invalid value for stored_metadata_rules in configuration file ({cfgpath}): "
f"rule {d} could not be parsed: {e}"
) from e

return cls(
music_source_dir=music_source_dir,
fuse_mount_dir=fuse_mount_dir,
Expand All @@ -331,6 +347,7 @@ def parse(cls, config_path_override: Path | None = None) -> Config:
cover_art_stems=cover_art_stems,
valid_art_exts=valid_art_exts,
ignore_release_directories=ignore_release_directories,
stored_metadata_rules=stored_metadata_rules,
hash=sha256(cfgtext.encode()).hexdigest(),
)

Expand Down
20 changes: 20 additions & 0 deletions rose/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from rose.config import Config, ConfigNotFoundError, InvalidConfigValueError, MissingConfigKeyError
from rose.rule_parser import MetadataRule, ReplaceAction


def test_config_minimal() -> None:
Expand Down Expand Up @@ -43,6 +44,9 @@ def test_config_full() -> None:
cover_art_stems = [ "aa", "bb" ]
valid_art_exts = [ "tiff" ]
ignore_release_directories = [ "dummy boy" ]
stored_metadata_rules = [
{{ tags = "tracktitle", matcher = "lala", action = {{ kind = "replace", replacement = "hihi" }} }}
]
""" # noqa: E501
)

Expand Down Expand Up @@ -79,6 +83,13 @@ def test_config_full() -> None:
cover_art_stems=["aa", "bb"],
valid_art_exts=["tiff"],
ignore_release_directories=["dummy boy"],
stored_metadata_rules=[
MetadataRule(
tags=["tracktitle"],
matcher="lala",
action=ReplaceAction(replacement="hihi"),
)
],
hash=c.hash,
)

Expand Down Expand Up @@ -390,3 +401,12 @@ def write(x: str) -> None:
== f"Invalid value for ignore_release_directories in configuration file ({path}): must be a list of strings" # noqa
)
config += '\nignore_release_directories = [ ".stversions" ]'

# stored_metadata_rules
write(config + '\nstored_metadata_rules = ["lalala"]')
with pytest.raises(InvalidConfigValueError) as excinfo:
Config.parse(config_path_override=path)
assert (
str(excinfo.value)
== f"Invalid value for stored_metadata_rules in configuration file ({path}): rule lalala could not be parsed: Type of metadata rule data must be dict: got <class 'str'>" # noqa: E501
)
235 changes: 235 additions & 0 deletions rose/rule_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
"""
The rule_parser module provides the typedef and parser for the rules engine. This is split out from
the rules engine in order to avoid a dependency cycle between the config module and the rules
module.
"""

from __future__ import annotations

import logging
import re
from dataclasses import dataclass
from typing import Any, Literal

from rose.common import RoseError

logger = logging.getLogger(__name__)


class InvalidRuleSpecError(RoseError):
pass


Tag = Literal[
"tracktitle",
"year",
"tracknumber",
"discnumber",
"albumtitle",
"genre",
"label",
"releasetype",
"artist",
]

ALL_TAGS: list[Tag] = [
"tracktitle",
"year",
"tracknumber",
"discnumber",
"albumtitle",
"genre",
"label",
"releasetype",
"artist",
]


@dataclass
class ReplaceAction:
"""
Replaces the matched tag with `replacement`. For multi-valued tags, only the matched value is
replaced; the other values are left alone.
"""

replacement: str


@dataclass
class ReplaceAllAction:
"""Specifically useful for multi-valued tags, replaces all values."""

replacement: list[str]


@dataclass
class SedAction:
"""
Executes a regex substitution on a tag value. For multi-valued tags, only the matched tag is
modified; the other values are left alone.
"""

src: re.Pattern[str]
dst: str


@dataclass
class SplitAction:
"""
Splits a tag into multiple tags on the provided delimiter. For multi-valued tags, only the
matched tag is split; the other values are left alone.
"""

delimiter: str


@dataclass
class DeleteAction:
"""
Deletes the tag value. In a multi-valued tag, only the matched value is deleted; the other
values are left alone.
"""

pass


@dataclass
class MetadataRule:
tags: list[Tag]
matcher: str
action: ReplaceAction | ReplaceAllAction | SedAction | SplitAction | DeleteAction

def __str__(self) -> str:
r = ",".join(self.tags)
r += ":"
r += self.matcher.replace(":", r"\:")
r += ":"
if isinstance(self.action, ReplaceAction):
r += "replace:"
r += self.action.replacement
elif isinstance(self.action, ReplaceAllAction):
r += "replaceall:"
r += ";".join(self.action.replacement)
elif isinstance(self.action, SedAction):
r += "sed:"
r += str(self.action.src.pattern).replace(":", r"\:")
r += ":"
r += self.action.dst.replace(":", r"\:")
elif isinstance(self.action, SplitAction):
r += "spliton:"
r += self.action.delimiter
elif isinstance(self.action, DeleteAction):
r += "delete"
return r

@classmethod
def parse_dict(cls, data: dict[str, Any]) -> MetadataRule:
if not isinstance(data, dict):
raise InvalidRuleSpecError(f"Type of metadata rule data must be dict: got {type(data)}")

try:
tags = data["tags"]
except KeyError as e:
raise InvalidRuleSpecError("Key `tags` not found") from e
if isinstance(tags, str):
tags = [tags]
if not isinstance(tags, list):
raise InvalidRuleSpecError(
f"Key `tags` must be a string or a list of strings: got {type(tags)}"
)
for t in tags:
if t not in ALL_TAGS and t != "*":
raise InvalidRuleSpecError(
f"Key `tags`'s values must be one of *, {', '.join(ALL_TAGS)}: got {t}"
)
if any(t == "*" for t in tags):
tags = ALL_TAGS

try:
matcher = data["matcher"]
except KeyError as e:
raise InvalidRuleSpecError("Key `matcher` not found") from e
if not isinstance(matcher, str):
raise InvalidRuleSpecError(f"Key `matcher` must be a string: got {type(matcher)}")

try:
action_dict = data["action"]
except KeyError as e:
raise InvalidRuleSpecError("Key `action` not found") from e
if not isinstance(action_dict, dict):
raise InvalidRuleSpecError(
f"Key `action` must be a dictionary: got {type(action_dict)}"
)

try:
action_kind = action_dict["kind"]
except KeyError as e:
raise InvalidRuleSpecError("Key `action.kind` not found") from e

action: ReplaceAction | ReplaceAllAction | SedAction | SplitAction | DeleteAction
if action_kind == "replace":
try:
action = ReplaceAction(replacement=action_dict["replacement"])
except KeyError as e:
raise InvalidRuleSpecError("Key `action.replacement` not found") from e
if not isinstance(action.replacement, str):
raise InvalidRuleSpecError(
f"Key `action.replacement` must be a string: got {type(action.replacement)}"
)
elif action_kind == "replaceall":
try:
action = ReplaceAllAction(replacement=action_dict["replacement"])
except KeyError as e:
raise InvalidRuleSpecError("Key `action.replacement` not found") from e
if not isinstance(action.replacement, list):
raise InvalidRuleSpecError(
"Key `action.replacement` must be a list of strings: "
f"got {type(action.replacement)}"
)
for t in action.replacement:
if not isinstance(t, str):
raise InvalidRuleSpecError(
f"Key `action.replacement`'s values must be strings: got {type(t)}"
)
elif action_kind == "sed":
try:
action_src = re.compile(action_dict["src"])
except KeyError as e:
raise InvalidRuleSpecError("Key `action.src` not found") from e
except re.error as e:
raise InvalidRuleSpecError(
"Key `action.src` contains an invalid regular expression"
) from e

try:
action_dst = action_dict["dst"]
except KeyError as e:
raise InvalidRuleSpecError("Key `action.dst` not found") from e
if not isinstance(action_dst, str):
raise InvalidRuleSpecError(
f"Key `action.dst` must be a string: got {type(action_dst)}"
)

action = SedAction(src=action_src, dst=action_dst)
elif action_kind == "spliton":
try:
action = SplitAction(delimiter=action_dict["delimiter"])
except KeyError as e:
raise InvalidRuleSpecError("Key `action.delimiter` not found") from e
if not isinstance(action.delimiter, str):
raise InvalidRuleSpecError(
f"Key `action.delimiter` must be a string: got {type(action.delimiter)}"
)
elif action_kind == "delete":
action = DeleteAction()
else:
raise InvalidRuleSpecError(
"Key `action.kind` must be one of replace, replaceall, sed, spliton, delete: "
f"got {action_kind}"
)

return cls(
tags=tags,
matcher=matcher,
action=action,
)
Loading

0 comments on commit 53a8c21

Please sign in to comment.