Skip to content

Commit

Permalink
fix: statements, rdf_graph ref
Browse files Browse the repository at this point in the history
  • Loading branch information
aMahanna committed Jan 21, 2024
1 parent 40f9441 commit 4b9fd95
Showing 1 changed file with 42 additions and 45 deletions.
87 changes: 42 additions & 45 deletions arango_rdf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,7 @@
from collections import defaultdict
from datetime import date, time
from pathlib import Path
from typing import (
Any,
Callable,
DefaultDict,
Dict,
Generator,
List,
Optional,
Set,
Tuple,
Union,
)
from typing import Any, Callable, DefaultDict, Dict, List, Optional, Set, Tuple, Union

import farmhash
from arango.cursor import Cursor
Expand All @@ -30,7 +19,6 @@
from rdflib import Dataset as RDFDataset
from rdflib import Graph as RDFGraph
from rdflib import Literal, URIRef
from rdflib.graph import _OptionalQuadType
from rich.console import Group
from rich.live import Live
from rich.progress import Progress
Expand Down Expand Up @@ -685,26 +673,26 @@ def rdf_to_arangodb_by_rpt(
adb_import_kwargs,
)

s: RDFTerm # Subject
p: URIRef # Predicate
o: RDFTerm # Object

statements = (
rdf_graph.quads
if isinstance(rdf_graph, RDFConjunctiveGraph)
else rdf_graph.triples
)

#############
# RPT: Main #
#############

rdf_graph_size = len(rdf_graph)
s: RDFTerm # Subject
p: URIRef # Predicate
o: RDFTerm # Object

rdf_graph_size = len(self.__rdf_graph)
batch_size = batch_size or rdf_graph_size
bar_progress = get_bar_progress("(RDF → ADB): RPT", "#BF23C4")
bar_progress_task = bar_progress.add_task("", total=rdf_graph_size)
spinner_progress = get_import_spinner_progress(" ")

statements = (
self.__rdf_graph.quads
if isinstance(rdf_graph, RDFConjunctiveGraph)
else self.__rdf_graph.triples
)

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1):
bar_progress.advance(bar_progress_task)
Expand Down Expand Up @@ -943,16 +931,6 @@ def rdf_to_arangodb_by_pgt(

self.__pgt_remove_blacklisted_statements()

s: RDFTerm # Subject
p: URIRef # Predicate
o: RDFTerm # Object

statements = (
self.__rdf_graph.quads
if isinstance(self.__rdf_graph, RDFConjunctiveGraph)
else self.__rdf_graph.triples
)

#################################
# Graph Contextualization (WIP) #
#################################
Expand Down Expand Up @@ -986,7 +964,9 @@ def rdf_to_arangodb_by_pgt(
self.__adb_col_statements = adb_col_statements

elif rdf_graph_has_adb_col_statements:
self.__adb_col_statements = self.extract_adb_col_statements(rdf_graph)
self.__adb_col_statements = self.extract_adb_col_statements(
self.__rdf_graph
)

else:
self.__adb_col_statements = RDFGraph()
Expand All @@ -999,7 +979,7 @@ def rdf_to_arangodb_by_pgt(
# us to run the ArangoDB Collection Mapping algorithm
# regardless of **write_adb_col_statements**
self.__adb_col_statements = self.write_adb_col_statements(
rdf_graph, self.__adb_col_statements, contextualize_graph
self.__rdf_graph, self.__adb_col_statements, contextualize_graph
)

###########################
Expand All @@ -1020,7 +1000,6 @@ def rdf_to_arangodb_by_pgt(
###########################

self.__pgt_parse_literal_statements(
statements,
contextualize_statement_func,
batch_size,
adb_import_kwargs,
Expand All @@ -1030,13 +1009,22 @@ def rdf_to_arangodb_by_pgt(
# PGT: Main #
#############

rdf_graph_size = len(rdf_graph)
batch_size = batch_size or rdf_graph_size
s: RDFTerm # Subject
p: URIRef # Predicate
o: RDFTerm # Object

rdf_graph_size = len(self.__rdf_graph)
batch_size = batch_size or rdf_graph_size
bar_progress = get_bar_progress("(RDF → ADB): PGT", "#08479E")
bar_progress_task = bar_progress.add_task("", total=rdf_graph_size)
spinner_progress = get_import_spinner_progress(" ")

statements = (
self.__rdf_graph.quads
if isinstance(self.__rdf_graph, RDFConjunctiveGraph)
else self.__rdf_graph.triples
)

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1):
bar_progress.advance(bar_progress_task)
Expand Down Expand Up @@ -1995,7 +1983,6 @@ def __pgt_remove_blacklisted_statements(self) -> None:

def __pgt_parse_literal_statements(
self,
statements: Callable[..., Generator[_OptionalQuadType, None, None]],
pgt_contextualize_statement_func: Callable[..., None],
batch_size: Optional[int],
adb_import_kwargs: Dict[str, Any],
Expand All @@ -2006,12 +1993,16 @@ def __pgt_parse_literal_statements(
Essential for RDF -> ArangoDB (PGT) transformations, as RDF Literals
are stored as ArangoDB Document properties.
:param statements: A function that returns a generator of RDF Statements.
:type statements: Callable[..., Generator[_OptionalQuadType, None, None]]
:param pgt_contextualize_statement_func: A function that contextualizes
an RDF Statement. A no-op function is used if Graph Contextualization
is disabled.
:type pgt_contextualize_statement_func: Callable[..., None]
:param batch_size: The batch size to use when inserting ArangoDB Documents.
Defaults to None.
:type batch_size: int | None
:param adb_import_kwargs: The keyword arguments to pass to
`ArangoRDF.__insert_adb_docs()`.
:type adb_import_kwargs: Dict[str, Any]
"""
# TODO: Revisit FILTER clauses
# We rely on the FILTER clauses to make sure no literal
Expand All @@ -2037,15 +2028,21 @@ def __pgt_parse_literal_statements(

data = self.__rdf_graph.query(query)

s: RDFTerm
p: URIRef
o: Literal

total = len(data)
batch_size = batch_size or total
bar_progress = get_bar_progress("(RDF → ADB): PGT [RDF Literals]", "#EF7D00")
bar_progress_task = bar_progress.add_task("", total=total - 1)
spinner_progress = get_import_spinner_progress(" ")

s: RDFTerm
p: URIRef
o: Literal
statements = (
self.__rdf_graph.quads
if isinstance(self.__rdf_graph, RDFConjunctiveGraph)
else self.__rdf_graph.triples
)

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p) in enumerate(data, 1):
Expand Down

0 comments on commit 4b9fd95

Please sign in to comment.