Skip to content

Commit

Permalink
add locks to protect against concurrent program execution
Browse files Browse the repository at this point in the history
  • Loading branch information
azuline committed Oct 25, 2023
1 parent c497392 commit 41f7b82
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 160 deletions.
84 changes: 53 additions & 31 deletions rose/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,35 @@ def migrate_database(c: Config) -> None:
@contextmanager
def lock(c: Config, name: str, timeout: float = 1.0) -> Iterator[None]:
try:
valid_until = time.time() + timeout
while True:
with connect(c) as conn:
cursor = conn.execute("SELECT MAX(valid_until) FROM locks WHERE name = ?", (name,))
row = cursor.fetchone()
if not row or not row[0] or row[0] < time.time():
logger.debug(f"Acquiring lock for {name} with timeout {timeout}")
valid_until = time.time() + timeout
conn.execute(
"INSERT INTO locks (name, valid_until) VALUES (?, ?)", (name, valid_until)
)
break
time.sleep(max(0, row[0] - time.time()))
sleep = max(0, row[0] - time.time())
logger.debug(f"Failed to acquire lock for {name}: sleeping for {sleep}")
time.sleep(sleep)
yield
finally:
logger.debug(f"Releasing lock {name}")
with connect(c) as conn:
conn.execute("DELETE FROM locks WHERE name = ?", (name,))


def release_lock_name(release_id: str) -> str:
return f"release-{release_id}"


def collage_lock_name(collage_name: str) -> str:
return f"collage-{collage_name}"


@dataclass
class CachedArtist:
name: str
Expand Down Expand Up @@ -547,6 +559,8 @@ def _update_cache_for_releases_executor(
)
new_release_id = str(uuid6.uuid7())
datafile_path = source_path / f".rose.{new_release_id}.toml"
# No need to lock here, as since the release ID is new, there is no way there is a
# concurrent writer.
with datafile_path.open("wb") as fp:
tomli_w.dump(asdict(stored_release_data), fp)
release.id = new_release_id
Expand All @@ -563,24 +577,27 @@ def _update_cache_for_releases_executor(
logger.debug(f"Datafile changed for release {source_path}, updating")
release_dirty = True
release.datafile_mtime = datafile_mtime
with datafile_path.open("rb") as fp:
diskdata = tomllib.load(fp)
datafile = StoredDataFile(
new=diskdata.get("new", True),
added_at=diskdata.get(
"added_at",
datetime.now().astimezone().replace(microsecond=0).isoformat(),
),
)
release.new = datafile.new
release.added_at = datafile.added_at
# And then write the data back to disk if it changed. This allows us to update
# datafiles to contain newer default values.
new_resolved_data = asdict(datafile)
if new_resolved_data != diskdata:
logger.debug(f"Updating values in stored data file for release {source_path}")
with datafile_path.open("wb") as fp:
tomli_w.dump(new_resolved_data, fp)
with lock(c, release_lock_name(preexisting_release_id)):
with datafile_path.open("rb") as fp:
diskdata = tomllib.load(fp)
datafile = StoredDataFile(
new=diskdata.get("new", True),
added_at=diskdata.get(
"added_at",
datetime.now().astimezone().replace(microsecond=0).isoformat(),
),
)
release.new = datafile.new
release.added_at = datafile.added_at
# And then write the data back to disk if it changed. This allows us to update
# datafiles to contain newer default values.
new_resolved_data = asdict(datafile)
if new_resolved_data != diskdata:
logger.debug(
f"Updating values in stored data file for release {source_path}"
)
with datafile_path.open("wb") as fp:
tomli_w.dump(new_resolved_data, fp)

# Handle cover art change.
try:
Expand Down Expand Up @@ -737,9 +754,10 @@ def _update_cache_for_releases_executor(
# in this case, we skip this code path once an ID is generated.
track_id = tags.id
if not track_id:
track_id = str(uuid6.uuid7())
tags.id = track_id
tags.flush()
with lock(c, release_lock_name(release.id)):
track_id = str(uuid6.uuid7())
tags.id = track_id
tags.flush()

# And now create the cached track.
track = CachedTrack(
Expand Down Expand Up @@ -1124,14 +1142,18 @@ def update_cache_for_collages(
if nonexistent_release_idxs:
new_diskdata_releases: list[dict[str, str]] = []
removed_releases: list[str] = []
for idx, rls in enumerate(diskdata.get("releases", [])):
if idx in nonexistent_release_idxs:
removed_releases.append(rls["description_meta"])
continue
new_diskdata_releases.append(rls)

with source_path.open("wb") as fp:
tomli_w.dump({"releases": new_diskdata_releases}, fp)
with lock(c, collage_lock_name(name)):
# Re-read disk data here in case it changed. Super rare case, but better to be
# correct than suffer from niche unexplainable bugs.
with source_path.open("rb") as fp:
diskdata = tomllib.load(fp)
for idx, rls in enumerate(diskdata.get("releases", [])):
if idx in nonexistent_release_idxs:
removed_releases.append(rls["description_meta"])
continue
new_diskdata_releases.append(rls)
with source_path.open("wb") as fp:
tomli_w.dump({"releases": new_diskdata_releases}, fp)
logger.info(
f"Removing nonexistent releases from collage {cached_collage.name}: "
f"{','.join(removed_releases)}"
Expand Down
118 changes: 64 additions & 54 deletions rose/collages.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from send2trash import send2trash

from rose.cache import (
collage_lock_name,
list_collage_releases,
list_collages,
lock,
update_cache_evict_nonexistent_collages,
update_cache_for_collages,
)
Expand All @@ -36,29 +38,32 @@ class CollageAlreadyExistsError(RoseError):
def create_collage(c: Config, name: str) -> None:
(c.music_source_dir / "!collages").mkdir(parents=True, exist_ok=True)
path = collage_path(c, name)
if path.exists():
raise CollageAlreadyExistsError(f"Collage {name} already exists")
path.touch()
with lock(c, collage_lock_name(name)):
if path.exists():
raise CollageAlreadyExistsError(f"Collage {name} already exists")
path.touch()
update_cache_for_collages(c, [name], force=True)


def delete_collage(c: Config, name: str) -> None:
path = collage_path(c, name)
if not path.exists():
raise CollageDoesNotExistError(f"Collage {name} does not exist")
send2trash(path)
with lock(c, collage_lock_name(name)):
if not path.exists():
raise CollageDoesNotExistError(f"Collage {name} does not exist")
send2trash(path)
update_cache_evict_nonexistent_collages(c)


def rename_collage(c: Config, old_name: str, new_name: str) -> None:
logger.info(f"Renaming collage {old_name} to {new_name}")
old_path = collage_path(c, old_name)
if not old_path.exists():
raise CollageDoesNotExistError(f"Collage {old_name} does not exist")
new_path = collage_path(c, new_name)
if new_path.exists():
raise CollageAlreadyExistsError(f"Collage {new_name} already exists")
old_path.rename(new_path)
with lock(c, collage_lock_name(old_name)), lock(c, collage_lock_name(new_name)):
if not old_path.exists():
raise CollageDoesNotExistError(f"Collage {old_name} does not exist")
if new_path.exists():
raise CollageAlreadyExistsError(f"Collage {new_name} already exists")
old_path.rename(new_path)
update_cache_for_collages(c, [new_name], force=True)
update_cache_evict_nonexistent_collages(c)

Expand All @@ -72,12 +77,13 @@ def delete_release_from_collage(
path = collage_path(c, collage_name)
if not path.exists():
raise CollageDoesNotExistError(f"Collage {collage_name} does not exist")
with path.open("rb") as fp:
data = tomllib.load(fp)
data["releases"] = data.get("releases", [])
data["releases"] = [r for r in data.get("releases", []) if r["uuid"] != release_id]
with path.open("wb") as fp:
tomli_w.dump(data, fp)
with lock(c, collage_lock_name(collage_name)):
with path.open("rb") as fp:
data = tomllib.load(fp)
data["releases"] = data.get("releases", [])
data["releases"] = [r for r in data.get("releases", []) if r["uuid"] != release_id]
with path.open("wb") as fp:
tomli_w.dump(data, fp)
logger.info(f"Removed release {release_dirname} from collage {collage_name}")
update_cache_for_collages(c, [collage_name], force=True)

Expand All @@ -91,18 +97,21 @@ def add_release_to_collage(
path = collage_path(c, collage_name)
if not path.exists():
raise CollageDoesNotExistError(f"Collage {collage_name} does not exist")
with path.open("rb") as fp:
data = tomllib.load(fp)
data["releases"] = data.get("releases", [])
# Check to see if release is already in the collage. If so, no op. We don't support duplicate
# collage entries.
for r in data["releases"]:
if r["uuid"] == release_id:
logger.debug(f"No-Opping: Release {release_dirname} already in collage {collage_name}")
return
data["releases"].append({"uuid": release_id, "description_meta": release_dirname})
with path.open("wb") as fp:
tomli_w.dump(data, fp)
with lock(c, collage_lock_name(collage_name)):
with path.open("rb") as fp:
data = tomllib.load(fp)
data["releases"] = data.get("releases", [])
# Check to see if release is already in the collage. If so, no op. We don't support
# duplicate collage entries.
for r in data["releases"]:
if r["uuid"] == release_id:
logger.debug(
f"No-Opping: Release {release_dirname} already in collage {collage_name}"
)
return
data["releases"].append({"uuid": release_id, "description_meta": release_dirname})
with path.open("wb") as fp:
tomli_w.dump(data, fp)
logger.info(f"Added release {release_dirname} to collage {collage_name}")
update_cache_for_collages(c, [collage_name], force=True)

Expand All @@ -121,31 +130,32 @@ def edit_collage_in_editor(c: Config, collage_name: str) -> None:
path = collage_path(c, collage_name)
if not path.exists():
raise CollageDoesNotExistError(f"Collage {collage_name} does not exist")
with path.open("rb") as fp:
data = tomllib.load(fp)
raw_releases = data.get("releases", [])
edited_release_descriptions = click.edit(
"\n".join([r["description_meta"] for r in raw_releases])
)
if edited_release_descriptions is None:
logger.info("Aborting: metadata file not submitted.")
return
uuid_mapping = {r["description_meta"]: r["uuid"] for r in raw_releases}

edited_releases: list[dict[str, Any]] = []
for desc in edited_release_descriptions.strip().split("\n"):
try:
uuid = uuid_mapping[desc]
except KeyError as e:
raise DescriptionMismatchError(
f"Release {desc} does not match a known release in the collage. "
"Was the line edited?"
) from e
edited_releases.append({"uuid": uuid, "description_meta": desc})
data["releases"] = edited_releases

with path.open("wb") as fp:
tomli_w.dump(data, fp)
with lock(c, collage_lock_name(collage_name), timeout=60.0):
with path.open("rb") as fp:
data = tomllib.load(fp)
raw_releases = data.get("releases", [])
edited_release_descriptions = click.edit(
"\n".join([r["description_meta"] for r in raw_releases])
)
if edited_release_descriptions is None:
logger.info("Aborting: metadata file not submitted.")
return
uuid_mapping = {r["description_meta"]: r["uuid"] for r in raw_releases}

edited_releases: list[dict[str, Any]] = []
for desc in edited_release_descriptions.strip().split("\n"):
try:
uuid = uuid_mapping[desc]
except KeyError as e:
raise DescriptionMismatchError(
f"Release {desc} does not match a known release in the collage. "
"Was the line edited?"
) from e
edited_releases.append({"uuid": uuid, "description_meta": desc})
data["releases"] = edited_releases

with path.open("wb") as fp:
tomli_w.dump(data, fp)
logger.info(f"Edited collage {collage_name} from EDITOR")
update_cache_for_collages(c, [collage_name], force=True)

Expand Down
Loading

0 comments on commit 41f7b82

Please sign in to comment.