diff --git a/README.md b/README.md index 4ce76e7..b0f5e8f 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,18 @@ If you want to generate a sheet of checksums for files that failed to import, yo exodus hash_errors --path /path/to/directory --output /path/to/sheet.csv ``` +If you want to generate an import sheet for all collections, you can: + +```shell +exodus generate_collection_metadata +``` + +If you want to generate an import sheet for a single collection, you can: + +```shell +exodus generate_collection_metadata --collection "namespace:identifier" +``` + ## What's Missing Here Right Now * The ability to create pcdm:Collection objects. diff --git a/pyproject.toml b/pyproject.toml index 586122b..d1489f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "utk-exodus" -version = "0.1.8" +version = "0.1.9" description = "A tool for building import sheets from UTK legacy systems" authors = ["Mark Baggett "] readme = "README.md" diff --git a/utk_exodus/__init__.py b/utk_exodus/__init__.py index 180ba38..bb2e059 100644 --- a/utk_exodus/__init__.py +++ b/utk_exodus/__init__.py @@ -9,8 +9,11 @@ from .combine import ImportRefactor from .template import ImportTemplate from .restrict import Restrictions, RestrictionsSheet +from .collection import CollectionMetadata, CollectionImporter __all__ = [ + "CollectionMetadata", + "CollectionImporter", "FedoraObject", "FileCurator", "FileOrganizer", diff --git a/utk_exodus/collection/__init__.py b/utk_exodus/collection/__init__.py new file mode 100644 index 0000000..854e67b --- /dev/null +++ b/utk_exodus/collection/__init__.py @@ -0,0 +1 @@ +from .collection import CollectionMetadata, CollectionImporter \ No newline at end of file diff --git a/utk_exodus/collection/collection.py b/utk_exodus/collection/collection.py new file mode 100644 index 0000000..12b9b7a --- /dev/null +++ b/utk_exodus/collection/collection.py @@ -0,0 +1,195 @@ +from lxml import etree +from io import BytesIO +import csv +from utk_exodus.fedora import FedoraObject +from utk_exodus.restrict import Restrictions +import os +from tqdm import tqdm + + +class CollectionMetadata: + """Grabs All Metadata for a Collection Object in Fedora.""" + + def __init__(self, pid): + self.pid = pid + self.namespaces = { + "mods": "http://www.loc.gov/mods/v3", + "xlink": "http://www.w3.org/1999/xlink", + } + self.mods = self.get_metadata(pid) + + def simplify_xpath(self, xpath): + return " | ".join( + [value.text for value in self.mods.xpath(xpath, namespaces=self.namespaces)] + ) + + def get_text_from_multiple_xpaths(self, xpaths): + all_matches = [] + for xpath in xpaths: + all_matches.extend( + [ + value.text + for value in self.mods.xpath(xpath, namespaces=self.namespaces) + ] + ) + return " | ".join(all_matches) + + def grab_all_metadata(self): + return { + "source_identifier": self.pid, + "model": "Collection", + "parents": "", + "title": self.simplify_xpath("mods:titleInfo/mods:title"), + "abstract": self.simplify_xpath("mods:abstract"), + "contributor": "", + "utk_contributor": self.get_text_from_multiple_xpaths( + [ + 'mods:name[mods:role/mods:roleTerm[contains(.,"Contributor")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Addressee")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Arranger")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Associated Name")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Autographer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Censor")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Choreographer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Client")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Contractor")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Copyright Holder")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Dedicatee")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Depicted")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Distributor")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Donor")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Editor")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Editor of Compilation")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Former Owner")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Honoree")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Host Institution")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Instrumentalist")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Interviewer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Issuing Body")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Music Copyist")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Musical Director")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Organizer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Originator")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Owner")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Performer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Printer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Printer of Plates")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Producer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Production Company")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Publisher")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Restorationist")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Set Designer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Signer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Speaker")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Stage Director")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Stage Manager")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Standards Body")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Surveyor")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Translator")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Videographer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Witness")]]/mods:namePart', + ] + ), + "creator": "", + "utk_creator": self.get_text_from_multiple_xpaths( + [ + 'mods:name[mods:role/mods:roleTerm[contains(.,"Creator")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Architect")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Artist")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Attributed Name")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Author")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Binding Designer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Cartographer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Compiler")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Composer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Correspondent")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Costume Designer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Designer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Engraver")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Illustrator")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Interviewee")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Lithographer")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Lyricist")]]/mods:namePart', + 'mods:name[mods:role/mods:roleTerm[contains(.,"Photographer")]]/mods:namePart', + ] + ), + "date_created": self.simplify_xpath( + "mods:originInfo/mods:dateCreated[not(@encoding)]" + ), + "date_issued": self.simplify_xpath( + "mods:originInfo/mods:dateIssued[not(@encoding)]" + ), + "date_created_d": self.simplify_xpath( + "mods:originInfo/mods:dateCreated[@encoding]" + ), + "date_issued_d": self.simplify_xpath( + "mods:originInfo/mods:dateIssued[@encoding]" + ), + "utk_publisher": self.simplify_xpath("mods:originInfo/mods:publisher"), + "publisher": "", + "publication_place": "", + "extent": self.simplify_xpath("mods:physicalDescription/mods:extent"), + "form": self.simplify_xpath("mods:physicalDescription/mods:form"), + "subject": "", + "keyword": self.simplify_xpath("mods:subject/mods:topic"), + "spatial": "", + "resource_type": "", + "note": self.simplify_xpath("mods:note"), + "repository": "", + "visibility": self.get_policy(self.pid), + } + + @staticmethod + def get_metadata(pid): + fedora = FedoraObject( + auth=( + os.environ.get("FEDORA_USERNAME"), + os.environ.get("FEDORA_PASSWORD"), + ), + fedora_uri=os.environ.get("FEDORA_URI"), + pid=f"{pid.replace('info:fedora/', '').strip()}", + ) + r = fedora.streamDatastream("MODS") + # @Todo: What if MODS doesn't exist? + return etree.parse(BytesIO(r.content)) + + @staticmethod + def get_policy(pid): + fedora = FedoraObject( + auth=( + os.environ.get("FEDORA_USERNAME"), + os.environ.get("FEDORA_PASSWORD"), + ), + fedora_uri=os.environ.get("FEDORA_URI"), + pid=f"{pid.replace('info:fedora/', '').strip()}", + ) + r = fedora.streamDatastream("POLICY") + if r.status_code == 200: + with open("tmp/POLICY.xml", "wb") as f: + f.write(r.content) + restrictions = Restrictions("tmp/POLICY.xml").get() + if restrictions.get("work_restricted", "open"): + return "restricted" + else: + return "open" + + +class CollectionImporter: + def __init__(self, collections): + self.collections = collections + self.collection_metadata = self.__build_collections() + self.headers = [k for k, v in self.collection_metadata[0].items()] + + def __build_collections(self): + return [ + CollectionMetadata(collection).grab_all_metadata() + for collection in tqdm(self.collections) + ] + + def write_csv(self, filename): + with open(filename, "w", newline="") as bulkrax_sheet: + writer = csv.DictWriter(bulkrax_sheet, fieldnames=self.headers) + writer.writeheader() + for data in self.collection_metadata: + writer.writerow(data) + return diff --git a/utk_exodus/exodus.py b/utk_exodus/exodus.py index f453caa..d98cd36 100644 --- a/utk_exodus/exodus.py +++ b/utk_exodus/exodus.py @@ -7,6 +7,8 @@ from utk_exodus.template import ImportTemplate from utk_exodus.combine import ImportRefactor from utk_exodus.checksum import HashSheet +from utk_exodus.collection import CollectionImporter +from utk_exodus.risearch import ResourceIndexSearch import click import requests @@ -240,3 +242,35 @@ def hash_errors( hs = HashSheet(path, output) hs.write() print(f"Hash sheet written to {output}.") + + +@cli.command( + "generate_collection_metadata", + help="Generate metadata for a collection.", +) +@click.option( + "--collection", + "-l", + required=False, + help="Specify the collection you want to download metadata for.", +) +@click.option( + "--output", + "-o", + required=False, + default="tmp/collections.csv", + help="Specify where to write output.", +) +def generate_collection_metadata( + collection: str, + output: str, +) -> None: + if collection: + print(f"Generating metadata for {collection}.") + x = CollectionImporter([collection]) + else: + print("Generating metadata for all collections.") + collections = ResourceIndexSearch().find_all_collections() + x = CollectionImporter(collections) + x.write_csv(output) + print("Done. Metadata written to tmp/all_collections.csv.") diff --git a/utk_exodus/metadata/base/__init__.py b/utk_exodus/metadata/base/__init__.py index a039fbd..ab96f4e 100644 --- a/utk_exodus/metadata/base/__init__.py +++ b/utk_exodus/metadata/base/__init__.py @@ -1,2 +1,3 @@ from .base import BaseProperty, StandardProperty, XMLtoDictProperty -__all__ = ['BaseProperty', 'StandardProperty', 'XMLtoDictProperty'] \ No newline at end of file + +__all__ = ["BaseProperty", "StandardProperty", "XMLtoDictProperty"] diff --git a/utk_exodus/risearch/risearch.py b/utk_exodus/risearch/risearch.py index 5991218..898787f 100644 --- a/utk_exodus/risearch/risearch.py +++ b/utk_exodus/risearch/risearch.py @@ -189,7 +189,11 @@ def get_works_based_on_type_and_collection(self, work_type, collection): f"}}" ) results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8") - return [result for result in results.split("\n") if result != "" and result != '"pid"'] + return [ + result + for result in results.split("\n") + if result != "" and result != '"pid"' + ] def get_policies_for_pages_in_book(self, book): query = quote( @@ -203,7 +207,11 @@ def get_policies_for_pages_in_book(self, book): f"FILTER(REGEX(STR(?o), 'POLICY')).}}" ) results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8") - return [result for result in results.split("\n") if result != "" and result != '"pid"'] + return [ + result + for result in results.split("\n") + if result != "" and result != '"pid"' + ] def get_policies_based_on_type_and_collection(self, work_type, collection): iri = self.__lookup_work_type(work_type).strip() @@ -219,10 +227,18 @@ def get_policies_based_on_type_and_collection(self, work_type, collection): ) results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8") if work_type != "book": - return [result for result in results.split("\n") if result != "" and result != '"pid"'] + return [ + result + for result in results.split("\n") + if result != "" and result != '"pid"' + ] else: all_policies_from_book = [] - books = [result for result in results.split("\n") if result != "" and result != '"pid"'] + books = [ + result + for result in results.split("\n") + if result != "" and result != '"pid"' + ] for book in books: all_policies_from_book.append(book) all_policies_from_book.extend(self.get_policies_for_pages_in_book(book)) @@ -246,8 +262,39 @@ def get_page_number(self, pid): results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8") return results.split("\n")[1] + def find_all_collections(self): + ignore = ( + "info:fedora/islandora:root", + "info:fedora/islandora:sp_large_image_collection", + "info:fedora/islandora:sp_basic_image_collection", + "info:fedora/islandora:manuscriptCollection", + "info:fedora/islandora:compound_collection", + "info:fedora/islandora:transformCollection", + "info:fedora/islandora:bookCollection", + "info:fedora/islandora:binary_object_collection", + "info:fedora/islandora:audio_collection", + "info:fedora/islandora:sp_pdf_collection", + "info:fedora/islandora:video_collection", + "info:fedora/digital:collections", + "info:fedora/ir:citationCollection", + "info:fedora/islandora:oralhistories_collection", + "info:fedora/collections:test", + "info:fedora/collections:rftatest", + ) + query = quote( + "SELECT ?collection WHERE { ?collection . }" + ) + results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8") + return [ + result.replace("info:fedora/", "") + for result in results.split("\n") + if result != "" and result not in ignore and result != '"collection"' + ] + if __name__ == "__main__": risearch = ResourceIndexSearch() - x = risearch.get_policies_based_on_type_and_collection("book", "collections:galston") + x = risearch.get_policies_based_on_type_and_collection( + "book", "collections:galston" + ) print(x)