Skip to content

Commit

Permalink
new: pgt_remove_blacklisted_statements, `pgt_parse_literal_statemen…
Browse files Browse the repository at this point in the history
…ts`, remove `adb_col_blacklist`

so many todos...
  • Loading branch information
aMahanna committed Jan 17, 2024
1 parent 01c31df commit 28dff29
Showing 1 changed file with 90 additions and 41 deletions.
131 changes: 90 additions & 41 deletions arango_rdf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
from collections import defaultdict
from datetime import date, time
from pathlib import Path
from typing import Any, Callable, DefaultDict, Dict, List, Optional, Set, Tuple, Union
from typing import (
Any,
Callable,
DefaultDict,
Dict,
Generator,
List,
Optional,
Set,
Tuple,
Union,
)

import farmhash
from arango.cursor import Cursor
Expand All @@ -18,6 +29,7 @@
from rdflib import Dataset as RDFDataset
from rdflib import Graph as RDFGraph
from rdflib import Literal, URIRef
from rdflib.graph import _OptionalQuadType
from rich.console import Group
from rich.live import Live
from rich.progress import Progress
Expand Down Expand Up @@ -820,12 +832,6 @@ def rdf_to_arangodb_by_pgt(
self.__rdf_list_heads: RDFListHeads = defaultdict(lambda: defaultdict(dict))
self.__rdf_list_data: RDFListData = defaultdict(lambda: defaultdict(dict))

# A set of ArangoDB Collections that will NOT imported via
# batch processing, as they contain documents whose properties
# are subject to change. For example, an RDF Resource may have
# multiple Literal statements associated to it.
self.__adb_col_blacklist: Set[str] = set()

# The ArangoDB Collection name of all unidentified RDF Resources
self.__UNKNOWN_RESOURCE = f"{name}_UnknownResource"

Expand All @@ -835,12 +841,6 @@ def rdf_to_arangodb_by_pgt(
if overwrite_graph:
self.db.delete_graph(name, ignore_missing=True, drop_collections=True)

rdf_statement_blacklist = {
(RDF.type, RDF.List),
(RDF.type, RDF.Bag),
(RDF.type, RDF.Seq),
}

rdf_graph_size = len(rdf_graph)
batch_size = batch_size or rdf_graph_size

Expand All @@ -858,12 +858,18 @@ def rdf_to_arangodb_by_pgt(
bar_progress_task = bar_progress.add_task(name, total=rdf_graph_size)
spinner_progress = get_import_spinner_progress(" ")

self.__pgt_remove_blacklisted_statements()

# NOTE: Graph Contextualization is an experimental work-in-progress
contextualize_statement_func = empty_func
if contextualize_graph:
self.__prepare_graph_for_contextualization(is_pgt=True)
contextualize_statement_func = self.__pgt_contextualize_statement

# self.__adb_col_statements = self.extract_adb_col_statements(rdf_graph)
# if write_adb_col_statements:
# self.__adb_col_statements = self.write_adb_col_statements(rdf_graph)

elif (None, self.adb_col_uri, None) not in rdf_graph:
self.__adb_col_statements = self.write_adb_col_statements(rdf_graph)
else:
Expand All @@ -877,13 +883,15 @@ def rdf_to_arangodb_by_pgt(
)
)

# TODO:
# self.__pgt_parse_rdf_lists()

self.__pgt_parse_literal_statements(statements, contextualize_statement_func)

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1):
bar_progress.advance(bar_progress_task)

if (p, o) in rdf_statement_blacklist:
continue

# Address the possibility of (s, p, o) being a part of the
# structure of an RDF Collection or an RDF Container.
rdf_list_col = self.__pgt_statement_is_part_of_rdf_list(s, p)
Expand All @@ -898,9 +906,7 @@ def rdf_to_arangodb_by_pgt(
)

if i % batch_size == 0:
self.__insert_adb_docs(
spinner_progress, use_async, self.__adb_col_blacklist
)
self.__insert_adb_docs(spinner_progress, use_async)

self.__insert_adb_docs(spinner_progress, use_async)

Expand Down Expand Up @@ -1768,6 +1774,64 @@ def __rpt_create_adb_graph(self, name: str) -> ADBGraph:
# Private: RDF -> ArangoDB (PGT) #
##################################

def __pgt_remove_blacklisted_statements(self) -> None:
rdf_statement_blacklist = {
(RDF.type, RDF.List),
(RDF.type, RDF.Bag),
(RDF.type, RDF.Seq),
}

for p, o in rdf_statement_blacklist:
for s in self.__rdf_graph.subjects(p, o):
self.__rdf_graph.remove((s, p, o))

def __pgt_parse_literal_statements(
self,
statements: Callable[..., Generator[_OptionalQuadType, None, None]],
pgt_contextualize_statement_func: Callable[..., None],
) -> None:
# query = """
# SELECT ?subject ?predicate
# WHERE {
# ?subject ?predicate ?object .
# FILTER isLiteral(?object)
# }
# GROUP BY ?subject ?predicate
# """

# TODO: Revisit FILTER clauses
query = """
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT ?subject ?predicate
WHERE {
?subject ?predicate ?object .
FILTER isLiteral(?object)
FILTER NOT EXISTS { ?subject rdf:first ?anyObject }
FILTER NOT EXISTS { ?subject rdf:rest ?anyObject }
FILTER (!regex(str(?predicate), "^http://www.w3.org/1999/02/22-rdf-syntax-ns#_[0-9]+$"))
FILTER (!regex(str(?predicate), "^http://www.w3.org/1999/02/22-rdf-syntax-ns#li$"))
}
GROUP BY ?subject ?predicate
"""

for s, p in self.__rdf_graph.query(query):
s_meta = self.__pgt_get_term_metadata(s)
self.__pgt_process_rdf_term(s_meta)

p_meta = self.__pgt_get_term_metadata(p)
self.__pgt_process_rdf_term(p_meta)

for _, _, o, *sg in statements((s, p, None)):
sg_str = self.__get_subgraph_str(sg)

o_meta = self.__pgt_get_term_metadata(o)
self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str)

pgt_contextualize_statement_func(s_meta, p_meta, o_meta, sg_str)

self.__rdf_graph.remove((s, p, o))

def __pgt_process_subject_predicate_object(
self,
s: RDFTerm,
Expand Down Expand Up @@ -1815,7 +1879,6 @@ def __pgt_get_term_metadata(self, t: Union[URIRef, BNode, Literal]) -> RDFTermMe

if p := self.__reified_subject_predicate_map.get(t):
t_col = t_label = self.rdf_id_to_adb_label(str(p))
self.__adb_col_blacklist.add(t_col) # TODO: Revisit

else:
t_col = str(
Expand Down Expand Up @@ -1913,8 +1976,6 @@ def __pgt_process_rdf_term(
t_value = self.__get_literal_val(t, str(t))
self.__pgt_rdf_val_to_adb_val(doc, p_label, t_value, process_val_as_string)

self.__adb_col_blacklist.add(s_col) # TODO: REVISIT

if sg_str:
doc["_sub_graph_uri"] = sg_str

Expand Down Expand Up @@ -2002,9 +2063,6 @@ def __pgt_process_statement(
self.__e_col_map[p_label]["from"].add(s_col)
self.__e_col_map[p_label]["to"].add(o_col)

if reified_triple_key:
self.__adb_col_blacklist.add(p_label)

def __pgt_object_is_head_of_rdf_list(self, o: RDFTerm) -> bool:
"""Return True if the RDF Object *o* is either the "root" node
of some RDF Collection or RDF Container within the RDF Graph.
Expand Down Expand Up @@ -2287,9 +2345,12 @@ def __pgt_create_adb_graph(self, name: str) -> ADBGraph:
):
all_v_cols.add(str(col))

adb_col_colblacklist = ["Statement", "List"] # TODO: REVISIT
for adb_col in adb_col_colblacklist:
all_v_cols.discard(adb_col)
# TODO: Revisit the following
# This discard prevents these collections
# from appearing as empty collections in the graph
# (they don't actually hold any documents)
all_v_cols.discard("Statement")
all_v_cols.discard("List")

for e_col, v_cols in self.__e_col_map.items():
edge_definitions.append(
Expand Down Expand Up @@ -2745,20 +2806,11 @@ def __get_literal_val(self, t: Literal, t_str: str) -> Any:

return t.value if t.value is not None else t_str

def __insert_adb_docs(
self,
spinner_progress: Progress,
use_async: bool,
adb_col_blacklist: Set[str] = set(),
) -> None:
def __insert_adb_docs(self, spinner_progress: Progress, use_async: bool) -> None:
"""RDF -> ArangoDB: Insert ArangoDB documents into their ArangoDB collection.
:param use_async: Performs asynchronous ingestion if enabled.
:type use_async: bool
:param adb_col_blacklist: A list of ArangoDB Collections that will not be
populated on this call of `__insert_adb_docs()`. Essential for allowing List
construction of RDF Literals (PGT Only).
:type adb_col_blacklist: Set[str]
"""
if len(self.__adb_docs) == 0:
return
Expand All @@ -2769,9 +2821,6 @@ def __insert_adb_docs(
adb_cols = list(self.__adb_docs.keys())

for col in adb_cols:
if col in adb_col_blacklist:
continue

doc_list = self.__adb_docs[col].values()

action = f"ADB Import: '{col}' ({len(doc_list)})"
Expand Down

0 comments on commit 28dff29

Please sign in to comment.