Skip to content

Commit

Permalink
FIX: Cache metadata against datafile/dataset instead of path
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Jul 8, 2024
1 parent 8cfcec4 commit 2829c63
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 82 deletions.
5 changes: 5 additions & 0 deletions octue/mixins/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
class Metadata:
_METADATA_ATTRIBUTES = tuple()

@property
@abstractmethod
def metadata_path(self):
pass

@property
def metadata_hash_value(self):
"""Get the hash of the instance's metadata, not including its ID.
Expand Down
26 changes: 13 additions & 13 deletions octue/resources/datafile.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,17 @@ def cloud_hash_value(self):
"""
return self._cloud_metadata.get("crc32c")

@property
def metadata_path(self):
"""Get the path to the datafile's local metadata file (if the datafile exists locally).
:return str|None:
"""
if not self.exists_locally:
return None

return os.path.join(os.path.dirname(self._local_path), METADATA_FILENAME)

@property
def timestamp(self):
"""Get the timestamp of the datafile.
Expand Down Expand Up @@ -307,17 +318,6 @@ def open(self):
"""
return functools.partial(_DatafileContextManager, self)

@property
def _local_metadata_path(self):
"""Get the path to the datafile's local metadata file (if the datafile exists locally).
:return str|None:
"""
if not self.exists_locally:
return None

return os.path.join(os.path.dirname(self._local_path), METADATA_FILENAME)

def __enter__(self):
self._open_context_manager = self.open(**self._open_attributes)
return self, self._open_context_manager.__enter__()
Expand Down Expand Up @@ -462,7 +462,7 @@ def update_local_metadata(self):
:return None:
"""
with UpdateLocalMetadata(self._local_metadata_path) as existing_metadata_records:
with UpdateLocalMetadata(self) as existing_metadata_records:
if not existing_metadata_records.get("datafiles"):
existing_metadata_records["datafiles"] = {}

Expand Down Expand Up @@ -596,7 +596,7 @@ def _use_local_metadata(self):
:return None:
"""
existing_metadata_records = load_local_metadata_file(self._local_metadata_path)
existing_metadata_records = load_local_metadata_file(self)
datafile_metadata = existing_metadata_records.get("datafiles", {}).get(self.name, {})

if not datafile_metadata:
Expand Down
12 changes: 6 additions & 6 deletions octue/resources/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def all_files_are_in_cloud(self):
return all(file.exists_in_cloud for file in self.files)

@property
def _metadata_path(self):
def metadata_path(self):
"""Get the path to the dataset's metadata file.
:return str:
Expand Down Expand Up @@ -245,15 +245,15 @@ def update_cloud_metadata(self):
"""
GoogleCloudStorageClient().upload_from_string(
string=json.dumps({"dataset": self.to_primitive(include_files=False)}, cls=OctueJSONEncoder),
cloud_path=self._metadata_path,
cloud_path=self.metadata_path,
)

def update_local_metadata(self):
"""Create or update the local octue metadata file with the dataset's metadata.
:return None:
"""
with UpdateLocalMetadata(self._metadata_path) as existing_metadata_records:
with UpdateLocalMetadata(self) as existing_metadata_records:
existing_metadata_records["dataset"] = self.to_primitive(include_files=False)
os.makedirs(self.path, exist_ok=True)

Expand Down Expand Up @@ -493,10 +493,10 @@ def _get_cloud_metadata(self):

storage_client = GoogleCloudStorageClient()

if not storage_client.exists(cloud_path=self._metadata_path):
if not storage_client.exists(cloud_path=self.metadata_path):
return

self._cloud_metadata = json.loads(storage_client.download_as_string(cloud_path=self._metadata_path)).get(
self._cloud_metadata = json.loads(storage_client.download_as_string(cloud_path=self.metadata_path)).get(
"dataset", {}
)

Expand All @@ -519,7 +519,7 @@ def _use_local_metadata(self):
:return None:
"""
local_metadata = load_local_metadata_file(self._metadata_path)
local_metadata = load_local_metadata_file(self)
dataset_metadata = local_metadata.get("dataset", {})

if not dataset_metadata:
Expand Down
74 changes: 37 additions & 37 deletions octue/utils/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,103 +15,103 @@


class UpdateLocalMetadata:
"""A context manager that provides the contents of the given local metadata file and updates it with any changes
made within its context. The local metadata is retrieved either from the disk or from the cache as appropriate.
"""A context manager that provides the contents of the given dataset's or datafile's local metadata file and updates
it with any changes made within its context. The local metadata is retrieved either from the disk or from the cache
as appropriate.
:param str path: the path to the local metadata. The file must be in JSON format.
:param octue.resources.datafile.Datafile|octue.resources.dataset.Dataset datafile_or_dataset: the datafile or dataset to update the local metadata for
:return None:
"""

def __init__(self, path=METADATA_FILENAME):
self.path = path
def __init__(self, datafile_or_dataset):
self.datafile_or_dataset = datafile_or_dataset
self._local_metadata = None

def __enter__(self):
"""Load the local metadata file and return its contents.
:return any: the contents of the local metadata file (converted from the JSON in the local metadata file)
"""
self._local_metadata = load_local_metadata_file(self.path)
self._local_metadata = load_local_metadata_file(self.datafile_or_dataset)
return self._local_metadata

def __exit__(self, exc_type, exc_val, exc_tb):
"""Write any changes to the contents of the file.
:return None:
"""
overwrite_local_metadata_file(self._local_metadata, self.path)
overwrite_local_metadata_file(self._local_metadata, self.datafile_or_dataset)


def load_local_metadata_file(path=METADATA_FILENAME):
"""Load metadata from a local metadata records file, returning an empty dictionary if the file does not exist or is
incorrectly formatted. If the file has already been cached, its contents are retrieved from the cache.
def load_local_metadata_file(datafile_or_dataset):
"""Load metadata from the local metadata records file for a datafile or dataset, returning an empty dictionary if
the file does not exist or is incorrectly formatted. If the file has already been cached, its contents are retrieved
from the cache.
:param str path: the path to the local metadata file
:param octue.resources.datafile.Datafile|octue.resources.dataset.Dataset datafile_or_dataset: the datafile or dataset to load the local metadata file for
:return dict: the contents of the local metadata file
"""
absolute_path = os.path.abspath(path)
cached_metadata = _get_metadata_from_cache(absolute_path)
cached_metadata = _get_metadata_from_cache(datafile_or_dataset)

if cached_metadata:
return cached_metadata

if not os.path.exists(path):
if not os.path.exists(datafile_or_dataset.metadata_path):
local_metadata = {}

else:
with open(path) as f:
with open(datafile_or_dataset.metadata_path) as f:
try:
local_metadata = json.load(f, cls=OctueJSONDecoder)
except json.decoder.JSONDecodeError:
logger.warning(
f"The metadata file at {path!r} is incorrectly formatted so no metadata can be read from it. "
"Please fix or delete it."
f"The metadata file at {datafile_or_dataset.metadata_path!r} is incorrectly formatted so no "
f"metadata can be read from it. Please fix or delete it."
)
local_metadata = {}

_overwrite_cache_entry(absolute_path, local_metadata)
_overwrite_cache_entry(datafile_or_dataset, local_metadata)
return local_metadata


def overwrite_local_metadata_file(data, path=METADATA_FILENAME):
"""Create or overwrite the given local metadata file with the given data. If the data to overwrite the file with is
the same as the file's cache entry, no changes are made. If it's not, the cache entry is updated and the file is
overwritten.
def overwrite_local_metadata_file(data, datafile_or_dataset):
"""Create or overwrite the local metadata file of a datafile or dataset with the given data. If the data to
overwrite the file with is the same as the file's cache entry, no changes are made. If it's not, the cache entry is
updated and the file is overwritten.
:param dict data: the data to overwrite the local metadata file with
:param str path: the path to the local metadata file
:param octue.resources.datafile.Datafile|octue.resources.dataset.Dataset datafile_or_dataset: the datafile or dataset to overwrite the local metadata file for
:return None:
"""
absolute_path = os.path.abspath(path)
cached_metadata = _get_metadata_from_cache(absolute_path)
cached_metadata = _get_metadata_from_cache(datafile_or_dataset)

if data == cached_metadata:
logger.debug("Avoiding overwriting local metadata file - its data is already in sync with the cache.")
return

_overwrite_cache_entry(absolute_path, data)
_overwrite_cache_entry(datafile_or_dataset, data)

with open(path, "w") as f:
with open(datafile_or_dataset.metadata_path, "w") as f:
json.dump(data, f, cls=OctueJSONEncoder, indent=4)
f.write("\n")


def _get_metadata_from_cache(absolute_path):
"""Get the metadata for the given local metadata file from the cache. If it's not cached, return `None`.
def _get_metadata_from_cache(datafile_or_dataset):
"""Get metadata for a datafile or dataset from the cache. If it's not cached, return `None`.
:param str absolute_path: the path to the local metadata file
:param octue.resources.datafile.Datafile|octue.resources.dataset.Dataset datafile_or_dataset: the datafile or dataset to get metadata from the cache for
:return dict|None: the metadata or, if the file hasn't been cached, `None`
"""
logger.debug("Using cached local metadata.")
return copy.deepcopy(cached_local_metadata_files.get(absolute_path))
logger.debug("Using cached local metadata for %r.", datafile_or_dataset)
return copy.deepcopy(cached_local_metadata_files.get(datafile_or_dataset.id))


def _overwrite_cache_entry(absolute_path, data):
"""Overwrite the metadata cache entry for the given local metadata file.
def _overwrite_cache_entry(datafile_or_dataset, data):
"""Overwrite the metadata cache entry for a datafile or dataset.
:param str absolute_path: the path to the local metadata file
:param octue.resources.datafile.Datafile|octue.resources.dataset.Dataset datafile_or_dataset: the datafile or dataset to overwrite metadata in the cache for
:param dict data: the data to overwrite the existing cache entry with.
:return None:
"""
cached_local_metadata_files[absolute_path] = copy.deepcopy(data)
logger.debug("Updated local metadata cache.")
cached_local_metadata_files[datafile_or_dataset.id] = copy.deepcopy(data)
logger.debug("Updated local metadata cache for %r.", datafile_or_dataset)
63 changes: 37 additions & 26 deletions tests/utils/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import tempfile
from unittest.mock import patch

from octue.resources import Datafile, Dataset
from octue.utils.metadata import cached_local_metadata_files, load_local_metadata_file, overwrite_local_metadata_file
from tests.base import BaseTestCase

Expand All @@ -13,12 +14,14 @@ def test_warning_raised_and_empty_dictionary_returned_if_local_metadata_file_cor
"""Test that a warning is raised and an empty dictionary is returned if trying to load a corrupted local
metadata file (e.g. not in JSON format).
"""
with tempfile.NamedTemporaryFile(delete=False) as temporary_file:
with open(temporary_file.name, "w") as f:
with tempfile.TemporaryDirectory() as temporary_directory:
datafile = Datafile(os.path.join(temporary_directory, "datafile.dat"))

with open(datafile.metadata_path, "w") as f:
f.write("some gobbledeegook")

with self.assertLogs(level=logging.WARNING) as logging_context:
local_metadata = load_local_metadata_file(temporary_file.name)
local_metadata = load_local_metadata_file(datafile)

self.assertEqual(local_metadata, {})

Expand All @@ -30,23 +33,25 @@ def test_warning_raised_and_empty_dictionary_returned_if_local_metadata_file_cor
def test_empty_dictionary_returned_if_local_metadata_file_does_not_exist(self):
"""Test that an empty dictionary is returned if trying to load a local metadata file that doesn't exist."""
with tempfile.TemporaryDirectory() as temporary_directory:
self.assertEqual(load_local_metadata_file(path=os.path.join(temporary_directory, ".octue")), {})
self.assertEqual(load_local_metadata_file(Dataset(path=temporary_directory)), {})

def test_local_metadata_is_cached_once_loaded_in_python_session(self):
"""Test that, if a local metadata file has been loaded once during the python session, it is loaded from the
cache instead of from disk for the rest of the session.
"""
with tempfile.NamedTemporaryFile(delete=False) as temporary_file:
with open(temporary_file.name, "w") as f:
with tempfile.TemporaryDirectory() as temporary_directory:
datafile = Datafile(os.path.join(temporary_directory, "datafile.dat"))

with open(datafile.metadata_path, "w") as f:
json.dump({"some": "data"}, f)

# Load the metadata file once and check its contents have been cached.
load_local_metadata_file(temporary_file.name)
self.assertEqual(cached_local_metadata_files[temporary_file.name], {"some": "data"})
load_local_metadata_file(datafile)
self.assertEqual(cached_local_metadata_files[datafile.id], {"some": "data"})

# Check that it's not loaded from disk again.
with patch("builtins.open") as mock_open:
local_metadata = load_local_metadata_file(temporary_file.name)
local_metadata = load_local_metadata_file(datafile)

mock_open.assert_not_called()
self.assertEqual(local_metadata, {"some": "data"})
Expand All @@ -55,18 +60,20 @@ def test_local_metadata_is_cached_if_already_written_to_in_python_session(self):
"""Test that, if a local metadata file has been written to during the python session, it is loaded from the
cache instead of from disk for the rest of the session.
"""
with tempfile.NamedTemporaryFile(delete=False) as temporary_file:
with tempfile.TemporaryDirectory() as temporary_directory:
datafile = Datafile(os.path.join(temporary_directory, "datafile.dat"))

# Write the metadata file and check its contents have been cached.
overwrite_local_metadata_file(data={"some": "data"}, path=temporary_file.name)
self.assertEqual(cached_local_metadata_files[temporary_file.name], {"some": "data"})
overwrite_local_metadata_file(data={"some": "data"}, datafile_or_dataset=datafile)
self.assertEqual(cached_local_metadata_files[datafile.id], {"some": "data"})

# Check the file has been written correctly.
with open(temporary_file.name) as f:
with open(datafile.metadata_path) as f:
self.assertEqual(json.load(f), {"some": "data"})

# Check that it's not loaded from disk again.
with patch("builtins.open") as mock_open:
local_metadata = load_local_metadata_file(temporary_file.name)
local_metadata = load_local_metadata_file(datafile)

mock_open.assert_not_called()
self.assertEqual(local_metadata, {"some": "data"})
Expand All @@ -75,18 +82,20 @@ def test_cache_not_busted_if_overwriting_with_same_data(self):
"""Test that the cache is not busted and the local metadata file is not rewritten if trying to overwrite it with
the same data as is in the cache.
"""
with tempfile.NamedTemporaryFile(delete=False) as temporary_file:
with open(temporary_file.name, "w") as f:
with tempfile.TemporaryDirectory() as temporary_directory:
datafile = Datafile(os.path.join(temporary_directory, "datafile.dat"))

with open(datafile.metadata_path, "w") as f:
json.dump({"some": "data"}, f)

# Load the metadata file once and check its contents have been cached.
load_local_metadata_file(temporary_file.name)
self.assertEqual(cached_local_metadata_files[temporary_file.name], {"some": "data"})
load_local_metadata_file(datafile)
self.assertEqual(cached_local_metadata_files[datafile.id], {"some": "data"})

# Overwrite the metadata file with the same data.
with self.assertLogs(level=logging.DEBUG) as logging_context:
with patch("builtins.open") as mock_open:
overwrite_local_metadata_file({"some": "data"}, path=temporary_file.name)
overwrite_local_metadata_file({"some": "data"}, datafile)

mock_open.assert_not_called()

Expand All @@ -99,19 +108,21 @@ def test_cache_busted_if_overwriting_with_new_data(self):
"""Test that the cache is busted and the local metadata file is rewritten if trying to overwrite it with
data different from what's in the cache.
"""
with tempfile.NamedTemporaryFile(delete=False) as temporary_file:
with open(temporary_file.name, "w") as f:
with tempfile.TemporaryDirectory() as temporary_directory:
datafile = Datafile(os.path.join(temporary_directory, "datafile.dat"))

with open(datafile.metadata_path, "w") as f:
json.dump({"some": "data"}, f)

# Load the metadata file once and check its contents have been cached.
load_local_metadata_file(temporary_file.name)
self.assertEqual(cached_local_metadata_files[temporary_file.name], {"some": "data"})
load_local_metadata_file(datafile)
self.assertEqual(cached_local_metadata_files[datafile.id], {"some": "data"})

overwrite_local_metadata_file({"new": "information"}, path=temporary_file.name)
overwrite_local_metadata_file({"new": "information"}, datafile)

# Check the metadata file has been overwritten.
with open(temporary_file.name) as f:
with open(datafile.metadata_path) as f:
self.assertEqual(json.load(f), {"new": "information"})

# Check the cache entry has been updated.
self.assertEqual(cached_local_metadata_files[temporary_file.name], {"new": "information"})
self.assertEqual(cached_local_metadata_files[datafile.id], {"new": "information"})

0 comments on commit 2829c63

Please sign in to comment.