diff --git a/arango_rdf/main.py b/arango_rdf/main.py index 2239f38..5e7fb04 100644 --- a/arango_rdf/main.py +++ b/arango_rdf/main.py @@ -596,18 +596,19 @@ def rdf_to_arangodb_by_rpt( self.__reified_subject_predicate_map = {} if simplify_reified_triples: - self.__parse_reified_triples(contextualize_statement_func, is_pgt=False) + self.__parse_reified_triples( + lambda s, p, o, sg, reified_triple_key: self.__rpt_process_subject_predicate_object( + s, p, o, sg, reified_triple_key, contextualize_statement_func + ) + ) with Live(Group(bar_progress, spinner_progress)): for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1): bar_progress.advance(bar_progress_task) - sg_str = self.__get_subgraph_str(sg) - s_meta = self.__rpt_process_term(s) - o_meta = self.__rpt_process_term(o) - self.__rpt_process_statement(s_meta, p, o_meta, sg_str) - - contextualize_statement_func(s_meta, p, o_meta, sg_str) + self.__rpt_process_subject_predicate_object( + s, p, o, sg, None, contextualize_statement_func + ) if i % batch_size == 0: self.__insert_adb_docs(spinner_progress, use_async) @@ -870,7 +871,11 @@ def rdf_to_arangodb_by_pgt( self.__reified_subject_predicate_map = {} if simplify_reified_triples: - self.__parse_reified_triples(contextualize_statement_func, is_pgt=True) + self.__parse_reified_triples( + lambda s, p, o, sg, reified_triple_key: self.__pgt_process_subject_predicate_object( + s, p, o, sg, reified_triple_key, contextualize_statement_func + ) + ) with Live(Group(bar_progress, spinner_progress)): for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1): @@ -888,20 +893,9 @@ def rdf_to_arangodb_by_pgt( self.__pgt_rdf_val_to_adb_val(doc, key, o) continue - sg_str = self.__get_subgraph_str(sg) - - s_meta = self.__pgt_get_term_metadata(s) - self.__pgt_process_rdf_term(s_meta) - - p_meta = self.__pgt_get_term_metadata(p) - self.__pgt_process_rdf_term(p_meta) - - o_meta = self.__pgt_get_term_metadata(o) - self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str) - - self.__pgt_process_statement(s_meta, p_meta, o_meta, sg_str) - - contextualize_statement_func(s_meta, p_meta, o_meta, sg_str) + self.__pgt_process_subject_predicate_object( + s, p, o, sg, None, contextualize_statement_func + ) if i % batch_size == 0: self.__insert_adb_docs( @@ -1596,6 +1590,21 @@ def extract_adb_key_statements( # Private: RDF -> ArangoDB (RPT) # ################################## + def __rpt_process_subject_predicate_object( + self, + s: RDFTerm, + p: URIRef, + o: RDFTerm, + sg: Optional[List[Any]], + reified_triple_key: Optional[str], + contextualize_statement_func: Callable[..., None], + ) -> None: + sg_str = self.__get_subgraph_str(sg) + s_meta = self.__rpt_process_term(s) + o_meta = self.__rpt_process_term(o) + self.__rpt_process_statement(s_meta, p, o_meta, sg_str, reified_triple_key) + contextualize_statement_func(s_meta, p, o_meta, sg_str) + def __rpt_process_term(self, t: RDFTerm) -> RDFTermMeta: """RDF -> ArangoDB (RPT): Process an RDF Term as an ArangoDB document via RPT Standards. Returns the ArangoDB Collection & Document Key associated @@ -1753,6 +1762,30 @@ def __rpt_create_adb_graph(self, name: str) -> ADBGraph: # Private: RDF -> ArangoDB (PGT) # ################################## + def __pgt_process_subject_predicate_object( + self, + s: RDFTerm, + p: URIRef, + o: RDFTerm, + sg: Optional[List[Any]], + reified_triple_key: Optional[str], + contextualize_statement_func: Callable[..., None], + ) -> None: + sg_str = self.__get_subgraph_str(sg) + + s_meta = self.__pgt_get_term_metadata(s) + self.__pgt_process_rdf_term(s_meta) + + p_meta = self.__pgt_get_term_metadata(p) + self.__pgt_process_rdf_term(p_meta) + + o_meta = self.__pgt_get_term_metadata(o) + self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str) + + self.__pgt_process_statement(s_meta, p_meta, o_meta, sg_str, reified_triple_key) + + contextualize_statement_func(s_meta, p_meta, o_meta, sg_str) + def __pgt_get_term_metadata(self, t: Union[URIRef, BNode, Literal]) -> RDFTermMeta: """Return the following PGT-relevant metadata associated to the RDF Term: 1. The RDF Term (**term**) @@ -2274,12 +2307,18 @@ def __pgt_create_adb_graph(self, name: str) -> ADBGraph: ######################################## def __parse_reified_triples( - self, contextualize_statement_func: Callable[..., None], is_pgt: bool + self, process_subject_predicate_object_func: Callable[..., None] ) -> None: """RDF -> ArangoDB: Parse all reified triples within the RDF Graph if Reified Triple Simplification is enabled. NOTE: This modifies the RDF Graph in-place. TODO: Revisit + + :param process_subject_predicate_object_func: A function that processes + the RDF Statement (s, p, o). Set to either + `__rpt_process_subject_predicate_object` or + `__pgt_process_subject_predicate_object`. + :type process_subject_predicate_object_func: Callable[..., None] """ if isinstance(self.__rdf_graph, RDFConjunctiveGraph): query = """ @@ -2323,34 +2362,7 @@ def __parse_reified_triples( str(reified_subject), reified_subject ) - sg_str = self.__get_subgraph_str(sg) - - # TODO: Clean this up - # __process_subject_predicate_object() function - # - __pgt_process_subject_predicate_object() - # - __rpt_process_subject_predicate_object() - if is_pgt: - s_meta = self.__pgt_get_term_metadata(s) - self.__pgt_process_rdf_term(s_meta) - - p_meta = self.__pgt_get_term_metadata(p) - self.__pgt_process_rdf_term(p_meta) - - o_meta = self.__pgt_get_term_metadata(o) - self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str) - - self.__pgt_process_statement( - s_meta, p_meta, o_meta, sg_str, reified_triple_key - ) - - contextualize_statement_func(s_meta, p_meta, o_meta, sg_str) - else: - s_meta = self.__rpt_process_term(s) - o_meta = self.__rpt_process_term(o) - self.__rpt_process_statement( - s_meta, p, o_meta, sg_str, reified_triple_key - ) - contextualize_statement_func(s_meta, p, o_meta, sg_str) + process_subject_predicate_object_func(s, p, o, sg, reified_triple_key) self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement)) self.__rdf_graph.remove((reified_subject, RDF.subject, s))