Skip to content

Commit

Permalink
checkpoint: __process_subject_predicate_object
Browse files Browse the repository at this point in the history
  • Loading branch information
aMahanna committed Jan 16, 2024
1 parent 4dc4948 commit fe9dd0f
Showing 1 changed file with 63 additions and 51 deletions.
114 changes: 63 additions & 51 deletions arango_rdf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,18 +596,19 @@ def rdf_to_arangodb_by_rpt(

self.__reified_subject_predicate_map = {}
if simplify_reified_triples:
self.__parse_reified_triples(contextualize_statement_func, is_pgt=False)
self.__parse_reified_triples(
lambda s, p, o, sg, reified_triple_key: self.__rpt_process_subject_predicate_object(
s, p, o, sg, reified_triple_key, contextualize_statement_func
)
)

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1):
bar_progress.advance(bar_progress_task)

sg_str = self.__get_subgraph_str(sg)
s_meta = self.__rpt_process_term(s)
o_meta = self.__rpt_process_term(o)
self.__rpt_process_statement(s_meta, p, o_meta, sg_str)

contextualize_statement_func(s_meta, p, o_meta, sg_str)
self.__rpt_process_subject_predicate_object(
s, p, o, sg, None, contextualize_statement_func
)

if i % batch_size == 0:
self.__insert_adb_docs(spinner_progress, use_async)
Expand Down Expand Up @@ -870,7 +871,11 @@ def rdf_to_arangodb_by_pgt(

self.__reified_subject_predicate_map = {}
if simplify_reified_triples:
self.__parse_reified_triples(contextualize_statement_func, is_pgt=True)
self.__parse_reified_triples(
lambda s, p, o, sg, reified_triple_key: self.__pgt_process_subject_predicate_object(
s, p, o, sg, reified_triple_key, contextualize_statement_func
)
)

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1):
Expand All @@ -888,20 +893,9 @@ def rdf_to_arangodb_by_pgt(
self.__pgt_rdf_val_to_adb_val(doc, key, o)
continue

sg_str = self.__get_subgraph_str(sg)

s_meta = self.__pgt_get_term_metadata(s)
self.__pgt_process_rdf_term(s_meta)

p_meta = self.__pgt_get_term_metadata(p)
self.__pgt_process_rdf_term(p_meta)

o_meta = self.__pgt_get_term_metadata(o)
self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str)

self.__pgt_process_statement(s_meta, p_meta, o_meta, sg_str)

contextualize_statement_func(s_meta, p_meta, o_meta, sg_str)
self.__pgt_process_subject_predicate_object(
s, p, o, sg, None, contextualize_statement_func
)

if i % batch_size == 0:
self.__insert_adb_docs(
Expand Down Expand Up @@ -1596,6 +1590,21 @@ def extract_adb_key_statements(
# Private: RDF -> ArangoDB (RPT) #
##################################

def __rpt_process_subject_predicate_object(
self,
s: RDFTerm,
p: URIRef,
o: RDFTerm,
sg: Optional[List[Any]],
reified_triple_key: Optional[str],
contextualize_statement_func: Callable[..., None],
) -> None:
sg_str = self.__get_subgraph_str(sg)
s_meta = self.__rpt_process_term(s)
o_meta = self.__rpt_process_term(o)
self.__rpt_process_statement(s_meta, p, o_meta, sg_str, reified_triple_key)
contextualize_statement_func(s_meta, p, o_meta, sg_str)

def __rpt_process_term(self, t: RDFTerm) -> RDFTermMeta:
"""RDF -> ArangoDB (RPT): Process an RDF Term as an ArangoDB document
via RPT Standards. Returns the ArangoDB Collection & Document Key associated
Expand Down Expand Up @@ -1753,6 +1762,30 @@ def __rpt_create_adb_graph(self, name: str) -> ADBGraph:
# Private: RDF -> ArangoDB (PGT) #
##################################

def __pgt_process_subject_predicate_object(
self,
s: RDFTerm,
p: URIRef,
o: RDFTerm,
sg: Optional[List[Any]],
reified_triple_key: Optional[str],
contextualize_statement_func: Callable[..., None],
) -> None:
sg_str = self.__get_subgraph_str(sg)

s_meta = self.__pgt_get_term_metadata(s)
self.__pgt_process_rdf_term(s_meta)

p_meta = self.__pgt_get_term_metadata(p)
self.__pgt_process_rdf_term(p_meta)

o_meta = self.__pgt_get_term_metadata(o)
self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str)

self.__pgt_process_statement(s_meta, p_meta, o_meta, sg_str, reified_triple_key)

contextualize_statement_func(s_meta, p_meta, o_meta, sg_str)

def __pgt_get_term_metadata(self, t: Union[URIRef, BNode, Literal]) -> RDFTermMeta:
"""Return the following PGT-relevant metadata associated to the RDF Term:
1. The RDF Term (**term**)
Expand Down Expand Up @@ -2274,12 +2307,18 @@ def __pgt_create_adb_graph(self, name: str) -> ADBGraph:
########################################

def __parse_reified_triples(
self, contextualize_statement_func: Callable[..., None], is_pgt: bool
self, process_subject_predicate_object_func: Callable[..., None]
) -> None:
"""RDF -> ArangoDB: Parse all reified triples within the RDF Graph
if Reified Triple Simplification is enabled.
NOTE: This modifies the RDF Graph in-place. TODO: Revisit
:param process_subject_predicate_object_func: A function that processes
the RDF Statement (s, p, o). Set to either
`__rpt_process_subject_predicate_object` or
`__pgt_process_subject_predicate_object`.
:type process_subject_predicate_object_func: Callable[..., None]
"""
if isinstance(self.__rdf_graph, RDFConjunctiveGraph):
query = """
Expand Down Expand Up @@ -2323,34 +2362,7 @@ def __parse_reified_triples(
str(reified_subject), reified_subject
)

sg_str = self.__get_subgraph_str(sg)

# TODO: Clean this up
# __process_subject_predicate_object() function
# - __pgt_process_subject_predicate_object()
# - __rpt_process_subject_predicate_object()
if is_pgt:
s_meta = self.__pgt_get_term_metadata(s)
self.__pgt_process_rdf_term(s_meta)

p_meta = self.__pgt_get_term_metadata(p)
self.__pgt_process_rdf_term(p_meta)

o_meta = self.__pgt_get_term_metadata(o)
self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str)

self.__pgt_process_statement(
s_meta, p_meta, o_meta, sg_str, reified_triple_key
)

contextualize_statement_func(s_meta, p_meta, o_meta, sg_str)
else:
s_meta = self.__rpt_process_term(s)
o_meta = self.__rpt_process_term(o)
self.__rpt_process_statement(
s_meta, p, o_meta, sg_str, reified_triple_key
)
contextualize_statement_func(s_meta, p, o_meta, sg_str)
process_subject_predicate_object_func(s, p, o, sg, reified_triple_key)

self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement))
self.__rdf_graph.remove((reified_subject, RDF.subject, s))
Expand Down

0 comments on commit fe9dd0f

Please sign in to comment.