Skip to content

Commit

Permalink
Merge pull request #83 from DataONEorg/feature-82-hashstoreclient-update
Browse files Browse the repository at this point in the history
Feature-82: Client Update & Bug Fixes
  • Loading branch information
doulikecookiedough authored Jan 17, 2024
2 parents 382c838 + d5a4bb8 commit acf66b6
Show file tree
Hide file tree
Showing 7 changed files with 542 additions and 354 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ take a longer time to run (relating to the storage of large files) - to execute

## HashStore Client

Client API Options:
- `-getchecksum` (get_hex_digest)
- `-findobject`
- `-storeobject`
- `-storemetadata`
- `-retrieveobject`
- `-retrievemetadata`
- `-deleteobject`
- `-deletemetadata`

How to use HashStore client (command line app)
```sh
# Step 1: Create a HashStore
Expand All @@ -192,6 +202,9 @@ $ python './src/hashstore/hashstoreclient.py' /path/to/store/ -chs -dp=3 -wp=2 -
# Get the checksum of a data object
$ python './src/hashstore/hashstoreclient.py' /path/to/store/ -getchecksum -pid=content_identifier -algo=SHA-256

# Find an object (returns the content identifier)
$ python './src/hashstore/hashstoreclient.py' /path/to/store/ -findobject -pid=content_identifier

# Store a data object
$ python './src/hashstore/hashstoreclient.py' /path/to/store/ -storeobject -pid=content_identifier -path=/path/to/object

Expand Down
301 changes: 160 additions & 141 deletions src/hashstore/filehashstore.py

Large diffs are not rendered by default.

182 changes: 99 additions & 83 deletions src/hashstore/hashstoreclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ def __init__(self):
action="store_true",
help="Delete objects in a HashStore",
)
self.parser.add_argument(
"-gbskip",
dest="gb_file_size_to_skip",
help="Number of objects to convert",
)

# Individual API call related optional arguments
self.parser.add_argument(
Expand Down Expand Up @@ -149,6 +154,12 @@ def __init__(self):
action="store_true",
help="Flag to get the hex digest of a data object in HashStore",
)
self.parser.add_argument(
"-findobject",
dest="client_findobject",
action="store_true",
help="Flag to determine if an object is stored in HashStore",
)
self.parser.add_argument(
"-storeobject",
dest="client_storeobject",
Expand Down Expand Up @@ -257,17 +268,21 @@ def __init__(self, properties, testflag=None):

# Methods relating to testing HashStore with knbvm (test.arcticdata.io)

def store_to_hashstore_from_list(self, origin_dir, obj_type, num):
def store_to_hashstore_from_list(self, origin_dir, obj_type, num, skip_obj_size):
"""Store objects in a given directory into HashStore.
:param str origin_dir: Directory to convert.
:param str obj_type: Type of objects ('object' or 'metadata').
:param int num: Number of files to store.
:param int skip_obj_size: Size of obj in GB to skip (ex. 4 = 4GB)
"""
info_msg = f"HashStore Client - Begin storing {obj_type} objects."
info_msg = f"HashStoreClient - Begin storing {obj_type} objects."
logging.info(info_msg)
# Object and Metadata list
metacat_obj_list = self.metacatdb.get_object_metadata_list(origin_dir, num)
metacat_obj_list = self.metacatdb.get_object_metadata_list(
origin_dir, num, skip_obj_size
)
logging.info(info_msg)

# Get list of objects to store from metacat db
if obj_type == self.OBJ_TYPE:
Expand Down Expand Up @@ -331,19 +346,24 @@ def try_store_metadata(self, obj_tuple):
except Exception as so_exception:
print(so_exception)

def retrieve_and_validate_from_hashstore(self, origin_dir, obj_type, num):
def retrieve_and_validate_from_hashstore(
self, origin_dir, obj_type, num, skip_obj_size
):
"""Retrieve objects or metadata from a Hashstore and validate the content.
:param str origin_dir: Directory to convert.
:param str obj_type: Type of objects ('object' or 'metadata').
:param int num: Number of files to store.
:param int skip_obj_size: Size of obj in GB to skip (ex. 4 = 4GB)
"""
info_msg = (
f"HashStore Client - Begin retrieving and validating {obj_type} objects."
)
logging.info(info_msg)
# Object and Metadata list
metacat_obj_list = self.metacatdb.get_object_metadata_list(origin_dir, num)
metacat_obj_list = self.metacatdb.get_object_metadata_list(
origin_dir, num, skip_obj_size
)

# Get list of objects to store from metacat db
logging.info("HashStore Client - Refining object list for %s", obj_type)
Expand Down Expand Up @@ -427,17 +447,20 @@ def validate_metadata(self, obj_tuple):

return

def delete_objects_from_list(self, origin_dir, obj_type, num):
def delete_objects_from_list(self, origin_dir, obj_type, num, skip_obj_size):
"""Deletes objects in a given directory into HashStore.
:param str origin_dir: Directory to convert.
:param str obj_type: Type of objects ('object' or 'metadata').
:param int num: Number of files to store.
:param int skip_obj_size: Size of obj in GB to skip (ex. 4 = 4GB)
"""
info_msg = f"HashStore Client - Begin deleting {obj_type} objects."
logging.info(info_msg)
# Object and Metadata list
metacat_obj_list = self.metacatdb.get_object_metadata_list(origin_dir, num)
metacat_obj_list = self.metacatdb.get_object_metadata_list(
origin_dir, num, skip_obj_size
)

# Get list of objects to store from metacat db
if obj_type == self.OBJ_TYPE:
Expand Down Expand Up @@ -536,11 +559,12 @@ def __init__(self, hashstore_path, hashstore):
checked_property = yaml_data[key]
self.db_yaml_dict[key] = checked_property

def get_object_metadata_list(self, origin_directory, num):
def get_object_metadata_list(self, origin_directory, num, skip_obj_size=None):
"""Query the Metacat database for the full object and metadata list, ordered by GUID.
:param str origin_directory: 'var/metacat/data' or 'var/metacat/documents'.
:param int num: Number of rows to retrieve from the Metacat database.
:param int skip_obj_size: Size of obj in GB to skip (ex. 4 = 4GB), defaults to 'None'
"""
# Create a connection to the database
db_user = self.db_yaml_dict["db_user"]
Expand All @@ -567,7 +591,7 @@ def get_object_metadata_list(self, origin_directory, num):
limit_query = f" LIMIT {num}"
query = f"""SELECT identifier.guid, identifier.docid, identifier.rev,
systemmetadata.object_format, systemmetadata.checksum,
systemmetadata.checksum_algorithm FROM identifier INNER JOIN systemmetadata
systemmetadata.checksum_algorithm, systemmetadata.size FROM identifier INNER JOIN systemmetadata
ON identifier.guid = systemmetadata.guid ORDER BY identifier.guid{limit_query};"""
cursor.execute(query)

Expand All @@ -577,21 +601,31 @@ def get_object_metadata_list(self, origin_directory, num):
# Create full object list to store into HashStore
print("Creating list of objects and metadata from metacat db")
object_metadata_list = []
gb_files_to_skip = None
if skip_obj_size is not None:
gb_files_to_skip = skip_obj_size * (1024**3)

for row in rows:
# Get pid, filepath and formatId
pid_guid = row[0]
metadatapath_docid_rev = origin_directory + "/" + row[1] + "." + str(row[2])
metadata_namespace = row[3]
row_checksum = row[4]
row_checksum_algorithm = row[5]
tuple_item = (
pid_guid,
metadatapath_docid_rev,
metadata_namespace,
row_checksum,
row_checksum_algorithm,
)
object_metadata_list.append(tuple_item)
size = row[6]
if gb_files_to_skip is not None and size > gb_files_to_skip:
continue
else:
# Get pid, filepath and formatId
pid_guid = row[0]
metadatapath_docid_rev = (
origin_directory + "/" + row[1] + "." + str(row[2])
)
metadata_namespace = row[3]
row_checksum = row[4]
row_checksum_algorithm = row[5]
tuple_item = (
pid_guid,
metadatapath_docid_rev,
metadata_namespace,
row_checksum,
row_checksum_algorithm,
)
object_metadata_list.append(tuple_item)

# Close the cursor and connection when done
cursor.close()
Expand Down Expand Up @@ -619,35 +653,25 @@ def refine_list_for_objects(self, metacat_obj_list, action):
item_checksum_algorithm = tuple_item[4]
if os.path.exists(filepath_docid_rev):
if action == "store":
# If the file has already been stored, skip it
if not self.hashstore.exists(
"objects", self.hashstore.get_sha256_hex_digest(pid_guid)
):
# This tuple is formed to match 'HashStore' store_object's signature
# Which is '.starmap()'ed when called
store_object_tuple_item = (
pid_guid,
filepath_docid_rev,
None,
item_checksum,
item_checksum_algorithm,
)
refined_object_list.append(store_object_tuple_item)
# This tuple is formed to match 'HashStore' store_object's signature
# Which is '.starmap()'ed when called
store_object_tuple_item = (
pid_guid,
filepath_docid_rev,
None,
item_checksum,
item_checksum_algorithm,
)
refined_object_list.append(store_object_tuple_item)
if action == "retrieve":
if self.hashstore.exists(
"objects", self.hashstore.get_sha256_hex_digest(pid_guid)
):
retrieve_object_tuple_item = (
pid_guid,
item_checksum_algorithm,
item_checksum,
)
refined_object_list.append(retrieve_object_tuple_item)
retrieve_object_tuple_item = (
pid_guid,
item_checksum_algorithm,
item_checksum,
)
refined_object_list.append(retrieve_object_tuple_item)
if action == "delete":
if self.hashstore.exists(
"objects", self.hashstore.get_sha256_hex_digest(pid_guid)
):
refined_object_list.append(pid_guid)
refined_object_list.append(pid_guid)

return refined_object_list

Expand All @@ -672,41 +696,22 @@ def refine_list_for_metadata(self, metacat_obj_list, action):
item_checksum_algorithm = tuple_item[4]
if os.path.exists(filepath_docid_rev):
if action == "store":
# If the file has already been stored, skip it
if not self.hashstore.exists(
"metadata",
self.hashstore.get_sha256_hex_digest(
pid_guid + metadata_namespace
),
):
tuple_item = (pid_guid, filepath_docid_rev, metadata_namespace)
refined_metadata_list.append(tuple_item)
tuple_item = (pid_guid, filepath_docid_rev, metadata_namespace)
refined_metadata_list.append(tuple_item)
if action == "retrieve":
if self.hashstore.exists(
"metadata",
self.hashstore.get_sha256_hex_digest(
pid_guid + metadata_namespace
),
):
tuple_item = (
pid_guid,
metadata_namespace,
item_checksum,
item_checksum_algorithm,
)
refined_metadata_list.append(tuple_item)
tuple_item = (
pid_guid,
metadata_namespace,
item_checksum,
item_checksum_algorithm,
)
refined_metadata_list.append(tuple_item)
if action == "delete":
if self.hashstore.exists(
"metadata",
self.hashstore.get_sha256_hex_digest(
pid_guid + metadata_namespace
),
):
tuple_item = (
pid_guid,
metadata_namespace,
)
refined_metadata_list.append(tuple_item)
tuple_item = (
pid_guid,
metadata_namespace,
)
refined_metadata_list.append(tuple_item)
return refined_metadata_list


Expand Down Expand Up @@ -777,6 +782,7 @@ def main():
number_of_objects_to_convert = getattr(args, "num_obj_to_convert")
# Determine if we are working with objects or metadata
directory_type = getattr(args, "source_directory_type")
size_of_obj_to_skip = getattr(args, "gb_file_size_to_skip")
accepted_directory_types = ["object", "metadata"]
if directory_type not in accepted_directory_types:
raise ValueError(
Expand All @@ -788,18 +794,21 @@ def main():
directory_to_convert,
directory_type,
number_of_objects_to_convert,
size_of_obj_to_skip,
)
if getattr(args, "retrieve_and_validate"):
hashstore_c.retrieve_and_validate_from_hashstore(
directory_to_convert,
directory_type,
number_of_objects_to_convert,
size_of_obj_to_skip,
)
if getattr(args, "delete_from_hashstore"):
hashstore_c.delete_objects_from_list(
directory_to_convert,
directory_type,
number_of_objects_to_convert,
size_of_obj_to_skip,
)
else:
raise FileNotFoundError(
Expand All @@ -816,6 +825,13 @@ def main():
print(f"algorithm: {algorithm}")
print(f"Checksum/Hex Digest: {digest}")

elif getattr(args, "client_findobject"):
if pid is None:
raise ValueError("'-pid' option is required")
# Find the content identifier of the object
cid = hashstore_c.hashstore.find_object(pid)
print(f"Content identifier: {cid}")

elif getattr(args, "client_storeobject"):
if pid is None:
raise ValueError("'-pid' option is required")
Expand Down
7 changes: 6 additions & 1 deletion tests/test_filehashstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,12 @@ def test_move_and_get_checksums_duplicates_raises_error(pids, store):
input_stream = io.open(path, "rb")
with pytest.raises(FileExistsError):
# pylint: disable=W0212
store._move_and_get_checksums(pid, input_stream)
store._move_and_get_checksums(
pid,
input_stream,
checksum="nonmatchingchecksum",
checksum_algorithm="sha256",
)
input_stream.close()
assert store.count(entity) == 3

Expand Down
Loading

0 comments on commit acf66b6

Please sign in to comment.