-
Notifications
You must be signed in to change notification settings - Fork 0
/
load.py
436 lines (384 loc) · 15 KB
/
load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
"""Loads all RDF Turtle files in custom/ and data/ directories"""
__author__ = "Jeremy Nelson"
import click
import datetime
import logging
import os
import requests
import rdflib
import re
import sys
import urllib.request
from types import SimpleNamespace
try:
from lxml import etree
except ImportError:
import xml.etree.ElementTree as etree
from xml.etree import ElementTree as default_etree
from bibcat import clean_uris, replace_iri, slugify
from bibcat.rml import processor
from bibcat.linkers.deduplicate import Deduplicator
PROJECT_BASE = os.path.abspath(os.path.dirname(__file__))
sys.path.append(PROJECT_BASE)
#try:
# import instance.config as config
# print("Config is instance.config {}".format(config))
#except ImportError:
config = SimpleNamespace()
config.TRIPLESTORE_URL = "http://localhost:9999/blazegraph/sparql"
config.BASE_URL = "https://bibcat.coalliance.org/"
processor.NS_MGR.bf = rdflib.Namespace("http://id.loc.gov/ontologies/bibframe/")
processor.NS_MGR.rdf = rdflib.RDF
processor.NS_MGR.rdfs = rdflib.RDFS
processor.NS_MGR.schema = rdflib.Namespace("http://schema.org/")
processor.NS_MGR.owl = rdflib.OWL
# Alliance Preprocessor
PREFIX = """PREFIX bf: <http://id.loc.gov/ontologies/bibframe/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>"""
class AlliancePreprocessor(object):
MARC_NS = {'marc': 'http://www.loc.gov/MARC21/slim'}
ALLIANCE_KEY_SPARQL = PREFIX + """
SELECT DISTINCT ?instance
WHERE {{
?instance rdf:type bf:Instance .
?instance bf:identifiedBy ?ident .
?ident rdf:value ?value .
FILTER(CONTAINS(?value, "{0}"))
}}"""
ORG_LABEL_SPARQL = PREFIX + """
SELECT DISTINCT ?label
WHERE {{
<{0}> rdfs:label ?label .
}}"""
MAX_WORK_TRIPLES_SPARQL = PREFIX + """
SELECT ?work ?count
WHERE {
?work rdf:type bf:Work .
{
SELECT ?work (count(*) as ?count)
WHERE {
?work ?p ?o .
}
}
} ORDER BY DESC(?count) LIMIT 1"""
MAX_INSTANCE_TRIPLES_SPARQL = PREFIX + """
SELECT ?instance ?count
WHERE {{
<{work}> bf:hasInstance ?instance .
{{
SELECT ?instance (count(*) as ?count)
WHERE {{
?instance ?p ?o .
}}
}}
}} ORDER BY DESC(?count) LIMIT 1"""
WORKS_OF_INSTANCE_SPARQL = PREFIX + """
SELECT DISTINCT ?work
WHERE {{
<{0}> bf:instanceOf ?work .
}}"""
def __init__(self,
bibframe_graph,
marc_xml,
institutional_iri,
triplestore_url=config.TRIPLESTORE_URL):
"""Creates an instance of Alliance Preprocessor
Args:
bibframe_graph(rdflib.Graph): BIBFRAME RDF graph
institutional_iri(rdflib.URIRef): Institutional IRI
marc_xml (etree.XML): MARC XML
triplestore_url (str): URL to Triplestore URL
"""
self.graph = bibframe_graph
self.institutional_iri = institutional_iri
self.marc_xml = marc_xml
self.triplestore_url = triplestore_url
def __get_create_items__(self, instance_iri):
# Create a stub item if none exists
def item_stub():
base_iri = str(instance_iri).split("#")[0]
item_iri = rdflib.URIRef("{}#InstanceStub".format(base_iri))
self.graph.add((item_iri,
processor.NS_MGR.rdf.type,
processor.NS_MGR.bf.Item))
self.graph.add((item_iri,
processor.NS_MGR.bf.itemOf,
instance_iri))
self.graph.add((instance_iri,
processor.NS_MGR.bf.hasItem,
item_iri))
return item_iri
item_iris = []
for row in self.graph.objects(subject=instance_iri,
predicate=processor.NS_MGR.bf.hasItem):
item_iris.append(row)
if len(item_iris) < 1:
item_iris.append(item_stub())
return item_iris
def __get_canonical_instance__(self):
"""Selects the BF Work with the largest number of triples
to use as the canonical entity for Instance"""
result = self.graph.query(
AlliancePreprocessor.MAX_WORK_TRIPLES_SPARQL)
if len(result.bindings) < 1:
raise ValueError("Could not extract max work triples")
org_work_iri = result.bindings[0]['work']
# Extract
instance_result = self.graph.query(
AlliancePreprocessor.MAX_INSTANCE_TRIPLES_SPARQL.format(
work=org_work_iri))
return rdflib.URIRef(instance_result.bindings[0]['instance'])
def __get_works__(self, instance_iri):
"""Attempts to retrieve any existing works from triplestore
that match Instance IRI
Args:
instance_iri(rdflib.URIRef): URI of instance
"""
works = []
result = requests.post(self.triplestore_url,
data={"query": WORKS_OF_INSTANCE_SPARQL.format(instance_iri),
"format": "json"})
if result.status_code > 399:
return works
bindings = result.json().get("results").get("bindings")
if len(bindings) < 1:
return works
for row in bindings:
work_uri = row.get("work").get("value")
works.append(rdflib.URIRef(work_iri))
return works
def __match_key__(self):
match_key = self.marc_xml.find(
"marc:datafield[@tag='997']/marc:subfield[@code='a']",
AlliancePreprocessor.MARC_NS)
if match_key is None:
return
result = requests.post(self.triplestore_url,
data={"query": AlliancePreprocessor.ALLIANCE_KEY_SPARQL.format(
match_key.text),
"format": "json"})
if result.status_code > 399:
return
bindings = result.json().get("results").get("bindings")
if len(bindings) < 1:
return
instance_url = bindings[0].get("instance").get("value")
return rdflib.URIRef(instance_url)
def __mint_instance_iri__(self, instance_iri):
"""Takes an existing BF Instance IRI, attempts to extract Work label
for minting a new Alliance IRI and replaces instance_iri for all
references of in graph.
Args:
instance_iri(rdflib.URIRef): URI of Instance
"""
work_iri = self.graph.value(subject=instance_iri,
predicate=processor.NS_MGR.bf.instanceOf)
work_label = self.graph.value(subject=work_iri,
predicate=processor.NS_MGR.rdfs.label)
if work_label is None:
return
new_instance_iri = rdflib.URIRef(
urllib.parse.urljoin(config.BASE_URL,
slugify(work_label)))
replace_iri(self.graph, instance_iri, new_instance_iri)
return new_instance_iri
def __mint_item_iris__(self, item_iris, instance_iri):
"""Takes BF Instance
and mints a new IRI based on the Instance IRI and the slugged
Institutional RDFS label
Args:
item_iri(lists): List of Item IRIs
instance_iri(rdflib.URIRef): New instance IRI
"""
output = []
sparql = AlliancePreprocessor.ORG_LABEL_SPARQL.format(
self.institutional_iri)
result = requests.post(self.triplestore_url,
data={"query": sparql,
"format": "json"})
if result.status_code > 399:
return output
bindings = result.json().get('results').get('bindings')
if len(bindings) < 1:
return output
institution_label = bindings[0].get('label').get('value')
for i, item_iri in enumerate(item_iris):
new_url = "{0}/{1}".format(
instance_iri,
slugify(institution_label))
if i > 0:
new_url += "-{0}".format(i)
new_item_iri = rdflib.URIRef(new_url)
replace_iri(self.graph, item_iri, new_item_iri)
output.append(new_item_iri)
return output
def run(self):
"""Runs Alliance Preprocessor"""
clean_uris(self.graph)
org_instance_iri = self.__get_canonical_instance__()
org_item_iris = self.__get_create_items__(org_instance_iri)
existing_instance_iri = self.__match_key__()
if existing_instance_iri is not None:
#for work in self.__get_works__(org_instance_iri):
# Should replace existing Work IRI
replace_iri(self.graph, org_instance_iri, existing_instance_iri)
new_instance_iri = existing_instance_iri
else:
new_instance_iri = self.__mint_instance_iri__(org_instance_iri)
new_item_iris = self.__mint_item_iris__(org_item_iris, new_instance_iri)
return new_instance_iri, new_item_iris
class AlliancePostProcessor(Deduplicator):
"""Class de-duplicates and generates IRIs for common BF classes"""
def __init__(self, **kwargs):
self.triplestore_url = kwargs.get(
'triplestore_url',
'http://localhost:9999/blazegraph/sparql/')
kwargs["classes"] = [
processor.NS_MGR.bf.Topic,
processor.NS_MGR.bf.Person,
processor.NS_MGR.bf.Organization,
processor.NS_MGR.bf.Role,
processor.NS_MGR.bf.Source
]
super(AlliancePostProcessor, self).__init__(**kwargs)
@click.command()
def turtles():
"""Loads all RDF Turtle files located in the custom and data
directories."""
start = datetime.datetime.now()
click.echo("Loading RDF turtle files for Alliance BIBCAT at {}".format(
start.isoformat()))
# Load custom ttl files for institutional metadata for richer
# context for ttl files in the data directory
headers = {"Content-type": "text/turtle"}
with open(os.path.join(PROJECT_BASE, "custom/alliance.ttl"), "rb") as fo:
result = requests.post(config.TRIPLESTORE_URL,
data=fo.read(),
headers=headers)
print(result.status_code)
for directory in ["data"]:
turtle_path = os.path.join(PROJECT_BASE, directory)
walker = next(os.walk(turtle_path))
for filename in walker[2]:
if not filename.endswith("ttl"):
continue
full_path = os.path.join(turtle_path, filename)
with open(full_path, "rb") as fo:
raw_turtle = fo.read()
request = urllib.request.Request(
url=config.TRIPLESTORE_URL,
data=raw_turtle,
headers=headers)
with urllib.request.urlopen(request) as triplestore_response:
click.echo("\t{} ingest result {}".format(filename,
triplestore_response.read().decode('utf-8')))
end = datetime.datetime.now()
click.echo("Finished RDF turtle load at {}, total time {} minutes".format(
end,
(end-start).seconds / 60.0))
def __update_instances__(xml, bf_rdf):
fields997 = xml.xpath(
"marc:datafield[@tag='997']/marc:subfield[@code='a']",
namespaces={"marc": "http://www.loc.gov/MARC21/slim"})
if len(fields997) > 0:
match_key = fields997[0].text
bf_rdf.update("""INSERT {{
?instance bf:identifiedBy _:key .
_:key rdf:type bf:Local .
_:key rdf:source <https://www.coalliance.org/> .
_:key rdf:value \"""{0}\""" .
}} WHERE {{
?instance rdf:type bf:Instance .
}}""".format(match_key))
@click.command()
@click.argument('marc_filepath')
@click.argument('mrc2bf_xsl')
@click.argument("held_by")
@click.argument("output_file_base")
@click.option('--shard_size', default=None, help="Sharded output graphs")
def marc_xml(marc_filepath,
mrc2bf_xsl,
held_by,
output_file_base,
shard_size):
"""Takes a MARC XML file and path to LOC's xslt file and transforms
to BIBFRAME 2.0 entities. If shard_size is set, shards records and
saves to output RDF ttl file.
\b
Args:
marc_filepath(str): File path to MARC XML
mrc2bf_xsl(str): File path to LOC's marc2bibframe.xsl XSLT file
held_by(str): Institution URL
output_file_base(str): Output file base
shard_size(int): Size of shard in MARC records
"""
logging.getLogger('rdflib').setLevel(logging.CRITICAL)
#marc_context = etree.iterparse(marc_filepath)
xslt_tree = etree.parse(mrc2bf_xsl)
xslt_transform = etree.XSLT(xslt_tree)
match_keys, record, total = None, {}, 0
output_graph = None
if shard_size is not None:
shard_size = int(shard_size)
output_graph = rdflib.Graph()
start = datetime.datetime.utcnow()
click.echo("Started transforming MARC to BF at {} for {}".format(
start.isoformat(),
marc_filepath))
for action, elem in default_etree.iterparse(marc_filepath):
if "record" in elem.tag:
record = etree.XML(default_etree.tostring(elem))
try:
bf_rdf_xml = xslt_transform(
record,
baseuri="'{0}'".format(config.BASE_URL))
except:
click.echo("XSLT error with {:,}".format(total),
nl=False)
continue
bf_rdf = rdflib.Graph()
bf_rdf.namespace_manager.bind("bf", processor.NS_MGR.bf)
try:
bf_rdf.parse(data=etree.tostring(bf_rdf_xml))
except:
click.echo("XML parse error with {:,}".format(total),
nl=False)
continue
clean_uris(bf_rdf)
bf_rdf.update("""INSERT {{
?item bf:heldBy <{0}> .
}} WHERE {{
?item rdf:type bf:Item .
}}""".format(held_by))
__update_instances__(record, bf_rdf)
if output_graph is not None:
output_graph += bf_rdf
total += 1
if not total%100:
click.echo(".", nl=False)
if not total%1000:
click.echo("{:,}".format(total), nl=False)
if shard_size is not None and not total%shard_size:
click.echo("w", nl=False)
output_file = os.path.join(
PROJECT_BASE,
os.path.join("data", "marc-output-{}-{}-{}k.xml".format(
output_file_base,
total-shard_size,
total)))
with open(output_file, "wb") as fo:
fo.write(output_graph.serialize())
output_graph = rdflib.Graph()
end = datetime.datetime.utcnow()
click.echo("Finished at {} total time {} minutes for {} records".format(
end.isoformat(),
(end-start).seconds / 60.0,
total))
@click.group()
def cli():
pass
cli.add_command(marc_xml)
cli.add_command(turtles)
if __name__ == '__main__':
cli()
#load_turtles()