From 59b1f42297bcdbd31c614fed41bd0b02647ba449 Mon Sep 17 00:00:00 2001 From: Anthony Mahanna Date: Fri, 19 Jan 2024 22:47:38 -0500 Subject: [PATCH] final main checkpoint: `arango_rdf` --- arango_rdf/abc.py | 2 - arango_rdf/main.py | 453 +++++++++++++++++++++++++++------------------ 2 files changed, 268 insertions(+), 187 deletions(-) diff --git a/arango_rdf/abc.py b/arango_rdf/abc.py index 26c85c40..682f74dc 100644 --- a/arango_rdf/abc.py +++ b/arango_rdf/abc.py @@ -25,7 +25,6 @@ def rdf_to_arangodb_by_rpt( flatten_reified_triples: bool, use_hashed_literals_as_keys: bool, overwrite_graph: bool, - use_async: bool, batch_size: Optional[int], **adb_import_kwargs: Any, ) -> ADBGraph: @@ -40,7 +39,6 @@ def rdf_to_arangodb_by_pgt( contextualize_graph: bool, flatten_reified_triples: bool, overwrite_graph: bool, - use_async: bool, batch_size: Optional[int], **adb_import_kwargs: Any, ) -> ADBGraph: diff --git a/arango_rdf/main.py b/arango_rdf/main.py index 985ee928..6204786a 100644 --- a/arango_rdf/main.py +++ b/arango_rdf/main.py @@ -92,8 +92,6 @@ def __init__( raise TypeError(msg) self.__db = db - self.__async_db = db.begin_async_execution(return_result=False) - self.__cntrl = controller self.__cntrl.db = db @@ -127,8 +125,8 @@ def __init__( for ns in os.listdir(f"{PROJECT_DIR}/meta"): self.__meta_graph.parse(f"{PROJECT_DIR}/meta/{ns}", format="trig") - # A mapping of Reified Subjects to their corresponding RDF Predicates. - self.__reified_subject_predicate_map: Dict[RDFTerm, URIRef] = {} + # A mapping of Reified Subjects to their corresponding ArangoDB Edge. + self.__reified_subject_map: Dict[Union[URIRef, BNode], Tuple[str, str, str]] # Commonly used URIs self.__rdfs_resource_str = str(RDFS.Resource) @@ -276,7 +274,10 @@ def arangodb_to_rdf( adb_e_cols = set(metagraph.get("edgeCollections", {})) - # PGT Scenario: Build a mapping of the RDF Predicates stored in ArangoDB + ####################### + # PGT: Round-Tripping # + ####################### + if self.db.has_collection("Property"): doc: Json for doc in self.db.collection("Property"): @@ -501,7 +502,6 @@ def rdf_to_arangodb_by_rpt( flatten_reified_triples: bool = True, use_hashed_literals_as_keys: bool = True, overwrite_graph: bool = False, - use_async: bool = False, batch_size: Optional[int] = None, **adb_import_kwargs: Any, ) -> ADBGraph: @@ -525,10 +525,9 @@ def rdf_to_arangodb_by_rpt( :param name: The name of the RDF Graph :type name: str - :param rdf_graph: The RDF Graph object. NOTE: This method does not - currently support RDF graphs of type `rdflib.graph.Dataset`. - Consider using `rdflib.graph.ConjunctiveGraph` if using quads - instead of triples is required. + :param rdf_graph: The RDF Graph object. NOTE: This object + is modified in-place in order for PGT to work. Do not + expect the original state of **rdf_graph** to be preserved. :type: rdf_graph: rdflib.graph.Graph :param contextualize_graph: A work-in-progress flag that seeks to enhance the Terminology Box of **rdf_graph** by providing @@ -561,9 +560,7 @@ def rdf_to_arangodb_by_rpt( by **name** if it already exists, and drops its associated collections. Defaults to False. :type overwrite_graph: bool - :param use_async: Performs asynchronous ArangoDB ingestion if enabled. Defaults to False. - :type use_async: bool :param batch_size: If specified, runs the ArangoDB Data Ingestion process for every **batch_size** RDF triples/quads within **rdf_graph**. Defaults to `len(rdf_graph)`. @@ -578,7 +575,9 @@ def rdf_to_arangodb_by_rpt( if isinstance(rdf_graph, RDFDataset): # pragma: no cover m = """ Invalid type for **rdf_graph**: ArangoRDF does not yet - support RDF Graphs of type `rdflib.graph.Dataset` + support RDF Graphs of type rdflib.graph.Dataset. Consider + using rdflib.graph.ConjunctiveGraph if using quads instead + of triples is required. """ raise TypeError(m) @@ -603,6 +602,10 @@ def rdf_to_arangodb_by_rpt( if overwrite_graph: self.db.delete_graph(name, ignore_missing=True, drop_collections=True) + ################################# + # Graph Contextualization (WIP) # + ################################# + # NOTE: Graph Contextualization is an experimental work-in-progress contextualize_statement_func = empty_func if contextualize_graph: @@ -619,12 +622,18 @@ def rdf_to_arangodb_by_rpt( self.__domain_range_map = self.__build_domain_range_map() self.__type_map = self.__combine_type_map_and_dr_map() - self.__reified_subject_predicate_map = {} - if flatten_reified_triples: - self.__flatten_reified_triples(contextualize_statement_func, is_pgt=False) + ########################### + # Flatten Reified Triples # + ########################### - rdf_graph_size = len(rdf_graph) - batch_size = batch_size or rdf_graph_size + self.__reified_subject_map = {} + if flatten_reified_triples: + self.__flatten_reified_triples( + self.__rpt_process_subject_predicate_object, + contextualize_statement_func, + batch_size, + adb_import_kwargs, + ) s: RDFTerm # Subject p: URIRef # Predicate @@ -636,6 +645,12 @@ def rdf_to_arangodb_by_rpt( else rdf_graph.triples ) + ############# + # RPT: Main # + ############# + + rdf_graph_size = len(rdf_graph) + batch_size = batch_size or rdf_graph_size bar_progress = get_bar_progress("(RDF → ADB): RPT", "#BF23C4") bar_progress_task = bar_progress.add_task("", total=rdf_graph_size) spinner_progress = get_import_spinner_progress(" ") @@ -649,13 +664,10 @@ def rdf_to_arangodb_by_rpt( ) if i % batch_size == 0: - self.__insert_adb_docs( - spinner_progress, use_async, **adb_import_kwargs - ) + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) - self.__insert_adb_docs(spinner_progress, use_async, **adb_import_kwargs) + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) - rdf_graph += self.__adb_key_statements return self.__rpt_create_adb_graph(name) ################################# @@ -671,7 +683,6 @@ def rdf_to_arangodb_by_pgt( contextualize_graph: bool = False, flatten_reified_triples: bool = True, overwrite_graph: bool = False, - use_async: bool = False, batch_size: Optional[int] = None, **adb_import_kwargs: Any, ) -> ADBGraph: @@ -798,21 +809,25 @@ def rdf_to_arangodb_by_pgt( :param name: The name of the RDF Graph :type name: str - :param rdf_graph: The RDF Graph object. NOTE: This method does not - currently support RDF graphs of type `rdflib.graph.Dataset`. - Consider using `rdflib.graph.ConjunctiveGraph` if using quads - instead of triples is required. + :param rdf_graph: The RDF Graph object. NOTE: This object + is modified in-place in order for PGT to work. Do not + expect the original state of **rdf_graph** to be preserved. :type: rdf_graph: rdflib.graph.Graph :param adb_col_statements: An optional RDF Graph containing ArangoDB Collection statements of the form - (adb_vertex http://arangodb/collection "adb_v_col") and + (adb_vertex http://arangodb/collection "adb_v_col"). If specified, will be used to determine the ArangoDB Collection - mapping of RDF Resources within **rdf_graph**. Defaults to None. + of RDF Resources within **rdf_graph**. Defaults to None. + NOTE: Cannot be used in conjunction with collection statements in + **rdf_graph**. :type adb_col_statements: rdflib.graph.Graph | None - :param write_adb_col_statements: If set to True, will write the ArangoDB + :param write_adb_col_statements: Run the ArangoDB Collection + Mapping Process for **rdf_graph** to write the ArangoDB Collection statements of the form (adb_vertex http://arangodb/collection "adb_v_col") - into **adb_col_statements**. Defaults to True. + into **adb_col_statements**. This parameter is ignored if + **contextualize_graph** is set to True, as the ArangoDB + Collection Mapping Process is required for Graph Contextualization. :type write_adb_col_statements: bool :param contextualize_graph: A work-in-progress flag that seeks to enhance the Terminology Box of **rdf_graph** by providing @@ -834,9 +849,6 @@ def rdf_to_arangodb_by_pgt( by **name** if it already exists, and drops its associated collections. Defaults to False. :type overwrite_graph: bool - :param use_async: Performs asynchronous ArangoDB ingestion if enabled. - Defaults to False. - :type use_async: bool :param batch_size: If specified, runs the ArangoDB Data Ingestion process for every **batch_size** RDF triples/quads within **rdf_graph**. Defaults to `len(rdf_graph)`. @@ -855,24 +867,12 @@ def rdf_to_arangodb_by_pgt( if isinstance(rdf_graph, RDFDataset): # pragma: no cover m = """ Invalid type for **rdf_graph**: ArangoRDF does not yet - support RDF Graphs of type rdflib.graph.Dataset + support RDF Graphs of type rdflib.graph.Dataset. Consider + using rdflib.graph.ConjunctiveGraph if using quads instead + of triples is required. """ raise TypeError(m) - if (None, self.adb_col_uri, None) in rdf_graph: - m = """ - Invalid RDF Graph: ArangoRDF does not yet - support RDF Graphs with the following statement: - (None, self.adb_col_uri, None) . - Please consider using `ArangoRDF.extract_adb_col_statements` - to remove this statement from the RDF Graph, and then - re-run this method with the **adb_col_statements** parameter. - """ - raise ValueError(m) - - if overwrite_graph: - self.db.delete_graph(name, ignore_missing=True, drop_collections=True) - self.__rdf_graph = rdf_graph self.__adb_key_statements = self.extract_adb_key_statements(rdf_graph) @@ -893,7 +893,20 @@ def rdf_to_arangodb_by_pgt( self.__pgt_remove_blacklisted_statements() - # NOTE: Graph Contextualization is an experimental work-in-progress + s: RDFTerm # Subject + p: URIRef # Predicate + o: RDFTerm # Object + + statements = ( + self.__rdf_graph.quads + if isinstance(self.__rdf_graph, RDFConjunctiveGraph) + else self.__rdf_graph.triples + ) + + ################################# + # Graph Contextualization (WIP) # + ################################# + contextualize_statement_func = empty_func if contextualize_graph: contextualize_statement_func = self.__pgt_contextualize_statement @@ -907,7 +920,30 @@ def rdf_to_arangodb_by_pgt( self.__e_col_map[label]["from"].add("Property") self.__e_col_map[label]["to"].add("Class") - self.__adb_col_statements = adb_col_statements or RDFGraph() + ################################## + # ArangoDB Collection Statements # + ################################## + + rdf_graph_has_adb_col_statements = (None, self.adb_col_uri, None) in rdf_graph + if adb_col_statements and rdf_graph_has_adb_col_statements: + m = """ + Ambiguity Error: Cannot specify both **adb_col_statements** + and **rdf_graph** with ArangoDB Collection statements. + """ + raise Exception(m) + + elif adb_col_statements: + self.__adb_col_statements = adb_col_statements + + elif rdf_graph_has_adb_col_statements: + self.__adb_col_statements = self.extract_adb_col_statements(rdf_graph) + + else: + self.__adb_col_statements = RDFGraph() + + if overwrite_graph: + self.db.delete_graph(name, ignore_missing=True, drop_collections=True) + if write_adb_col_statements or contextualize_graph: # Enabling Graph Contextualization forces # us to run the ArangoDB Collection Mapping algorithm @@ -916,24 +952,33 @@ def rdf_to_arangodb_by_pgt( rdf_graph, self.__adb_col_statements, contextualize_graph ) - self.__reified_subject_predicate_map = {} + ########################### + # Flatten Reified Triples # + ########################### + + self.__reified_subject_map = {} if flatten_reified_triples: - self.__flatten_reified_triples(contextualize_statement_func, is_pgt=True) + self.__flatten_reified_triples( + self.__pgt_process_subject_predicate_object, + contextualize_statement_func, + batch_size, + adb_import_kwargs, + ) - s: RDFTerm # Subject - p: URIRef # Predicate - o: RDFTerm # Object + ########################### + # PGT: Literal Statements # + ########################### - statements = ( - self.__rdf_graph.quads - if isinstance(self.__rdf_graph, RDFConjunctiveGraph) - else self.__rdf_graph.triples + self.__pgt_parse_literal_statements( + statements, + contextualize_statement_func, + batch_size, + adb_import_kwargs, ) - # TODO: - # self.__pgt_parse_rdf_lists() - - self.__pgt_parse_literal_statements(statements, contextualize_statement_func) + ############# + # PGT: Main # + ############# rdf_graph_size = len(rdf_graph) batch_size = batch_size or rdf_graph_size @@ -961,18 +1006,19 @@ def rdf_to_arangodb_by_pgt( ) if i % batch_size == 0: - self.__insert_adb_docs( - spinner_progress, use_async, **adb_import_kwargs - ) + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) + + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) - self.__insert_adb_docs(spinner_progress, use_async, **adb_import_kwargs) + ################## + # PGT: RDF Lists # + ################## bar_progress = get_bar_progress("(RDF → ADB): PGT [RDF Lists]", "#EF7D00") with Live(Group(bar_progress, spinner_progress)): self.__pgt_process_rdf_lists(bar_progress) - self.__insert_adb_docs(spinner_progress, use_async) + self.__insert_adb_docs(spinner_progress) - rdf_graph += self.__adb_key_statements return self.__pgt_create_adb_graph(name) def write_adb_col_statements( @@ -1081,12 +1127,8 @@ def write_adb_col_statements( # 5. Finalize **adb_col_statements** for rdf_map in [self.__explicit_type_map, self.__domain_range_map]: for rdf_resource, class_set in rdf_map.items(): - has_mapping = ( - rdf_resource, - None, - None, - ) in self.__adb_col_statements - if has_mapping or len(class_set) == 0: + t = (rdf_resource, None, None) + if t in self.__adb_col_statements or len(class_set) == 0: continue # pragma: no cover # (false negative) adb_col = self.rdf_id_to_adb_label( @@ -1097,15 +1139,7 @@ def write_adb_col_statements( self.__add_adb_col_statement(rdf_resource, adb_col) - self.__adb_col_statements.remove( - (self.adb_col_uri, self.adb_col_uri, Literal("Property")) - ) - - self.__adb_col_statements.remove( - (self.adb_key_uri, self.adb_col_uri, Literal("Property")) - ) - - return RDFGraph() + self.__adb_col_statements + return self.__adb_col_statements ####################################### # Public: RDF -> ArangoDB (RPT & PGT) # @@ -1660,7 +1694,7 @@ def __rpt_process_subject_predicate_object( p: URIRef, o: RDFTerm, sg: Optional[List[Any]], - reified_triple_key: Optional[str], + reified_subject: Optional[Union[URIRef, BNode]], contextualize_statement_func: Callable[..., None], ) -> None: """RDF -> ArangoDB (RPT): Processes the RDF Statement (s, p, o) @@ -1674,9 +1708,10 @@ def __rpt_process_subject_predicate_object( :type o: URIRef | BNode | Literal :param sg: The Sub Graph URI of the (s,p,o) statement, if any. :type sg: URIRef | None - :param reified_triple_key: The ArangoDB Document Key of - the triple (s,p,o) if the triple is reified. Defaults to None. - :type reified_triple_key: str | None + :param reified_subject: The RDF Subject of the RDF Statement + (s, p, o) that was originally in Reified form. Only used + during `ArangoRDF.__flatten_reified_triples()`. + :type reified_subject: URIRef | BNode | None :param contextualize_statement_func: A function that contextualizes an RDF Statement. A no-op function is used if Graph Contextualization is disabled. @@ -1688,7 +1723,7 @@ def __rpt_process_subject_predicate_object( o_meta = self.__rpt_process_term(o) - self.__rpt_process_statement(s_meta, p, o_meta, sg_str, reified_triple_key) + self.__rpt_process_statement(s_meta, p, o_meta, sg_str, reified_subject) contextualize_statement_func(s_meta, p, o_meta, sg_str) @@ -1709,9 +1744,11 @@ def __rpt_process_term(self, t: RDFTerm) -> RDFTermMeta: t_key = self.rdf_id_to_adb_key(t_str, t) t_label = "" - if t in self.__reified_subject_predicate_map: + if t in self.__reified_subject_map: t_col = self.__STATEMENT_COL + # TODO: Populate adb docs? Or uncessary? + elif type(t) is URIRef: t_col = self.__URIREF_COL t_label = self.rdf_id_to_adb_label(t_str) @@ -1762,7 +1799,7 @@ def __rpt_process_statement( p: URIRef, o_meta: RDFTermMeta, sg_str: str, - reified_triple_key: Optional[str] = None, + reified_subject: Optional[Union[URIRef, BNode]] = None, ) -> None: """RDF -> ArangoDB (RPT): Processes the RDF Statement (s, p, o) as an ArangoDB edge for RPT. @@ -1776,6 +1813,10 @@ def __rpt_process_statement( :param sg_str: The string representation of the sub-graph URIRef associated to this statement (if any). :type sg_str: str + :param reified_subject: The RDF Subject of the RDF Statement + (s, p, o) that was originally in Reified form. Only used + during `ArangoRDF.__flatten_reified_triples()`. + :type reified_subject: URIRef | BNode | None """ _, s_col, s_key, _ = s_meta _, o_col, o_key, _ = o_meta @@ -1784,16 +1825,23 @@ def __rpt_process_statement( p_key = self.rdf_id_to_adb_key(p_str) p_label = self.rdf_id_to_adb_label(p_str) - e_key = reified_triple_key or self.hash(f"{s_key}-{p_key}-{o_key}") + _from = f"{s_col}/{s_key}" + _to = f"{o_col}/{o_key}" + + if reified_subject: + e_key = self.rdf_id_to_adb_key(str(reified_subject), reified_subject) + self.__reified_subject_map[reified_subject] = (_from, p_label, _to) + else: + e_key = self.hash(f"{s_key}-{p_key}-{o_key}") self.__add_adb_edge( - col=self.__STATEMENT_COL, - key=e_key, - _from=f"{s_col}/{s_key}", - _to=f"{o_col}/{o_key}", - _uri=p_str, - _label=p_label, - _sg=sg_str, + self.__STATEMENT_COL, + e_key, + _from, + _to, + p_str, + p_label, + sg_str, ) def __rpt_contextualize_statement( @@ -1893,6 +1941,8 @@ def __pgt_parse_literal_statements( self, statements: Callable[..., Generator[_OptionalQuadType, None, None]], pgt_contextualize_statement_func: Callable[..., None], + batch_size: Optional[int], + adb_import_kwargs: Dict[str, Any], ) -> None: """RDF -> ArangoDB (PGT): Pre-processes all RDF Literal statements (i.e "Datatype Property Statements") within the RDF Graph. @@ -1928,11 +1978,14 @@ def __pgt_parse_literal_statements( data = self.__rdf_graph.query(query) + total = len(data) + batch_size = batch_size or total bar_progress = get_bar_progress("(RDF → ADB): PGT [RDF Literals]", "#EF7D00") - bar_progress_task = bar_progress.add_task("", total=len(data) - 1) + bar_progress_task = bar_progress.add_task("", total=total - 1) + spinner_progress = get_spinner_progress(" ") - with Live(Group(bar_progress)): - for s, p in self.__rdf_graph.query(query): + with Live(Group(bar_progress, spinner_progress)): + for i, (s, p) in enumerate(self.__rdf_graph.query(query)): s_meta = self.__pgt_get_term_metadata(s) self.__pgt_process_rdf_term(s_meta) @@ -1951,13 +2004,18 @@ def __pgt_parse_literal_statements( bar_progress.advance(bar_progress_task) + if i % batch_size == 0: + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) + + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) + def __pgt_process_subject_predicate_object( self, s: RDFTerm, p: URIRef, o: RDFTerm, sg: Optional[List[Any]], - reified_triple_key: Optional[str], + reified_subject: Optional[Union[URIRef, BNode]], contextualize_statement_func: Callable[..., None], ) -> None: """RDF -> ArangoDB (PGT): Processes the RDF Statement (s, p, o) @@ -1971,9 +2029,10 @@ def __pgt_process_subject_predicate_object( :type o: URIRef | BNode | Literal :param sg: The Sub Graph URI of the (s,p,o) statement, if any. :type sg: URIRef | None - :param reified_triple_key: The ArangoDB Document Key of - the triple (s,p,o) if the triple is reified. Defaults to None. - :type reified_triple_key: str | None + :param reified_subject: The RDF Subject of the RDF Statement + (s, p, o) that was originally in Reified form. Only used + during `ArangoRDF.__flatten_reified_triples()`. + :type reified_subject: URIRef | BNode | None :param contextualize_statement_func: A function that contextualizes an RDF Statement. A no-op function is used if Graph Contextualization is disabled. @@ -1990,7 +2049,7 @@ def __pgt_process_subject_predicate_object( o_meta = self.__pgt_get_term_metadata(o) self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str) - self.__pgt_process_statement(s_meta, p_meta, o_meta, sg_str, reified_triple_key) + self.__pgt_process_statement(s_meta, p_meta, o_meta, sg_str, reified_subject) contextualize_statement_func(s_meta, p_meta, o_meta, sg_str) @@ -2016,8 +2075,9 @@ def __pgt_get_term_metadata(self, t: Union[URIRef, BNode, Literal]) -> RDFTermMe t_key = self.rdf_id_to_adb_key(t_str, t) t_label = self.rdf_id_to_adb_label(t_str) - if p := self.__reified_subject_predicate_map.get(t): - t_col = t_label = self.rdf_id_to_adb_label(str(p)) + if data := self.__reified_subject_map.get(t): + _, p_label, _ = data + t_col = t_label = p_label else: t_col = str( @@ -2094,7 +2154,15 @@ def __pgt_process_rdf_term( if t_key in self.__adb_docs.get(t_col, {}): return - if type(t) is URIRef: + if t in self.__reified_subject_map: + _from, _, _to = self.__reified_subject_map[t] + self.__adb_docs[t_col][t_key] = { + "_key": t_key, + "_from": _from, + "_to": _to, + } + + elif type(t) is URIRef: self.__adb_docs[t_col][t_key] = { "_key": t_key, "_uri": str(t), @@ -2159,7 +2227,7 @@ def __pgt_process_statement( p_meta: RDFTermMeta, o_meta: RDFTermMeta, sg_str: str, - reified_triple_key: Optional[str] = None, + reified_subject: Optional[Union[URIRef, BNode]] = None, ) -> None: """RDF -> ArangoDB (PGT): Processes the RDF Statement (s, p, o) as an ArangoDB Edge for PGT. @@ -2179,6 +2247,10 @@ def __pgt_process_statement( :param sg_str: The string representation of the sub-graph URIRef associated to this statement (if any). :type sg_str: str + :param reified_subject: The RDF Subject of the RDF Statement + (s, p, o) that was originally in Reified form. Only used + during `ArangoRDF.__flatten_reified_triples()`. + :type reified_subject: URIRef | BNode | None """ o, o_col, o_key, _ = o_meta @@ -2188,13 +2260,20 @@ def __pgt_process_statement( _, s_col, s_key, _ = s_meta p, _, p_key, p_label = p_meta - e_key = reified_triple_key or self.hash(f"{s_key}-{p_key}-{o_key}") + _from = f"{s_col}/{s_key}" + _to = f"{o_col}/{o_key}" + + if reified_subject: + e_key = self.rdf_id_to_adb_key(str(reified_subject), reified_subject) + self.__reified_subject_map[reified_subject] = (_from, p_label, _to) + else: + e_key = self.hash(f"{s_key}-{p_key}-{o_key}") self.__add_adb_edge( p_label, # local name of predicate URI is used as the collection name e_key, - f"{s_col}/{s_key}", - f"{o_col}/{o_key}", + _from, + _to, str(p), p_label, sg_str, @@ -2545,89 +2624,97 @@ def __load_meta_ontology(self, rdf_graph: RDFGraph) -> RDFConjunctiveGraph: return graph def __flatten_reified_triples( - self, contextualize_statement_func: Callable[..., None], is_pgt: bool + self, + process_subject_predicate_object: Callable[..., None], + contextualize_statement_func: Callable[..., None], + batch_size: Optional[int], + adb_import_kwargs: Dict[str, Any], ) -> None: """RDF -> ArangoDB: Parse all reified triples within the RDF Graph if Reified Triple Simplification is enabled. NOTE: This modifies the RDF Graph in-place. TODO: Revisit + :param process_subject_predicate_object: A function that processes + the RDF Statement (s, p, o) as an ArangoDB document. Either + `__rpt_process_subject_predicate_object` or + `__pgt_process_subject_predicate_object`. + :type process_subject_predicate_object: Callable[..., None] :param contextualize_statement_func: A function that contextualizes an RDF Statement. A no-op function is used if Graph Contextualization is disabled. :type contextualize_statement_func: Callable[..., None] - :param is_pgt: Whether the RDF Graph is being processed by PGT or RPT. - :type is_pgt: bool """ - if isinstance(self.__rdf_graph, RDFConjunctiveGraph): - query = """ - SELECT ?subject ?stmtSubject ?stmtPredicate ?stmtObject ?g - WHERE { - ?subject a rdf:Statement ; - rdf:subject ?stmtSubject ; - rdf:predicate ?stmtPredicate ; - rdf:object ?stmtObject . - OPTIONAL { GRAPH ?h { ?subject a rdf:Statement . } } - BIND(IF(BOUND(?h), ?h, iri("")) AS ?g) - # TODO: Figure out why UNDEF is not working - } - """ + graph_supports_quads = isinstance(self.__rdf_graph, RDFConjunctiveGraph) + + # Recursion is used to process nested reified triples + # Things can get really wild here... + def process_reified_subject( + reified_subject: RDFTerm, sg: Optional[List[Any]] + ) -> None: + s = self.__rdf_graph.value(reified_subject, RDF.subject) + p = self.__rdf_graph.value(reified_subject, RDF.predicate) + o = self.__rdf_graph.value(reified_subject, RDF.object) + + for t in [(s, RDF.type, RDF.Statement), (o, RDF.type, RDF.Statement)]: + if t in self.__rdf_graph: + new_reified_subject = t[0] + if graph_supports_quads: + new_sg = list(self.__rdf_graph.contexts(t)) + process_reified_subject(new_reified_subject, new_sg) + else: + process_reified_subject(new_reified_subject, sg) + + process_subject_predicate_object( + s, p, o, sg, reified_subject, contextualize_statement_func + ) - else: - query = """ - SELECT ?subject ?stmtSubject ?stmtPredicate ?stmtObject - WHERE { - ?subject a rdf:Statement ; - rdf:subject ?stmtSubject ; - rdf:predicate ?stmtPredicate ; - rdf:object ?stmtObject . - } + # Remove the reified triple from the RDF Graph + # once it has been processed + self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement)) + self.__rdf_graph.remove((reified_subject, RDF.subject, s)) + self.__rdf_graph.remove((reified_subject, RDF.predicate, p)) + self.__rdf_graph.remove((reified_subject, RDF.object, o)) + + graph_return = "" + graph_clause = "" + if graph_supports_quads: + graph_return = "?g" + graph_clause = """ + OPTIONAL { GRAPH ?h { ?reified_subject a rdf:Statement . } } + BIND(IF(BOUND(?h), ?h, iri("")) AS ?g) + # TODO: Figure out why UNDEF is not working """ - data = self.__rdf_graph.query(query) - - reified_subject: RDFTerm - s: RDFTerm - p: URIRef - o: RDFTerm - - process_subject_predicate_object = ( - self.__pgt_process_subject_predicate_object - if is_pgt - else self.__rpt_process_subject_predicate_object - ) - - self.__reified_subject_predicate_map = {} - - m = "(RDF → ADB): Flatten Reified Triples [Prep]" - bar_progress = get_bar_progress(m, "#FFFFFF") - bar_progress_task = bar_progress.add_task("", total=len(data)) - - with Live(Group(bar_progress)): - for reified_subject, _, p, *_ in data: - self.__reified_subject_predicate_map[reified_subject] = p - bar_progress.advance(bar_progress_task) + query = f""" + SELECT ?reified_subject {graph_return} + WHERE {{ + ?reified_subject a rdf:Statement . + {graph_clause} + }} + """ + data = self.__rdf_graph.query(query) + total = len(data) + batch_size = batch_size or total m = "(RDF → ADB): Flatten Reified Triples" bar_progress = get_bar_progress(m, "#FFFFFF") - bar_progress_task = bar_progress.add_task("", total=len(data)) + bar_progress_task = bar_progress.add_task("", total=total) + spinner_progress = get_import_spinner_progress(" ") - with Live(Group(bar_progress)): - for reified_subject, s, p, o, *sg in data: - reified_triple_key = self.rdf_id_to_adb_key( - str(reified_subject), reified_subject - ) + with Live(Group(bar_progress, spinner_progress)): + for i, (reified_subject, *sg) in enumerate(data): + # Only process the reified triple if it has not been processed yet + # i.e recursion + if reified_subject not in self.__reified_subject_map: + process_reified_subject(reified_subject, sg) - process_subject_predicate_object( - s, p, o, sg, reified_triple_key, contextualize_statement_func - ) + bar_progress.advance(bar_progress_task) - self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement)) - self.__rdf_graph.remove((reified_subject, RDF.subject, s)) - self.__rdf_graph.remove((reified_subject, RDF.predicate, p)) - self.__rdf_graph.remove((reified_subject, RDF.object, o)) + if i % batch_size == 0: + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) - bar_progress.advance(bar_progress_task) + self.__insert_adb_docs(spinner_progress, **adb_import_kwargs) def __get_subgraph_str(self, possible_sg: Optional[List[Any]]) -> str: """RDF -> ArangoDB: Extract the sub-graph URIRef string of a quad (if any). @@ -2649,7 +2736,7 @@ def __get_subgraph_str(self, possible_sg: Optional[List[Any]]) -> str: if type(sg_identifier) is BNode: return "" # TODO: Revisit - raise ValueError("Sub Graph Identifier is not a URIRef or BNode.") + raise ValueError(f"Sub Graph Identifier is not a URIRef or BNode: {sg}") def __add_adb_edge( self, @@ -3009,14 +3096,12 @@ def __get_literal_val(self, t: Literal, t_str: str) -> Any: return t.value if t.value is not None else t_str def __insert_adb_docs( - self, spinner_progress: Progress, use_async: bool, **adb_import_kwargs: Any + self, spinner_progress: Progress, **adb_import_kwargs: Any ) -> None: """RDF -> ArangoDB: Insert ArangoDB documents into their ArangoDB collection. :param spinner_progress: The spinner progress bar. :type spinner_progress: rich.progress.Progress - :param use_async: Performs asynchronous ingestion if enabled. - :type use_async: bool :param adb_import_kwargs: Keyword arguments to specify additional parameters for ArangoDB document insertion. Full parameter list: https://docs.python-arango.com/en/main/specs.html#arango.collection.Collection.import_bulk @@ -3027,8 +3112,6 @@ def __insert_adb_docs( adb_import_kwargs["on_duplicate"] = "update" - db = self.__async_db if use_async else self.db - # Avoiding "RuntimeError: dictionary changed size during iteration" adb_cols = list(self.__adb_docs.keys()) @@ -3042,7 +3125,7 @@ def __insert_adb_docs( is_edge = col in self.__e_col_map self.db.create_collection(col, edge=is_edge) - result = db.collection(col).import_bulk(doc_list, **adb_import_kwargs) + result = self.db.collection(col).import_bulk(doc_list, **adb_import_kwargs) logger.debug(result) del self.__adb_docs[col]