-
Notifications
You must be signed in to change notification settings - Fork 0
/
pull_wikibase_world.py
76 lines (67 loc) · 2.65 KB
/
pull_wikibase_world.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""Pull Wikibase World Data"""
import asyncio
import json
from sqlalchemy import or_, select
from data.database_connection import get_async_session
from model.database.wikibase_model import WikibaseModel
from model.database.wikibase_url_model import WikibaseURLModel
WIKIBASES_QUERY = """PREFIX wdt: <https://wikibase.world/prop/direct/>
PREFIX wd: <https://wikibase.world/entity/>
PREFIX wikibase: <http://wikiba.se/ontology#>
PREFIX bd: <http://www.bigdata.com/rdf#>
SELECT ?itemLabel ?url ?host ?hostLabel ?available ?availableLabel ?sparqlUIUrl ?sparqlEndpointUrl WHERE {
?item wdt:P3 wd:Q10;
wdt:P1 ?url;
wdt:P2 ?host;
wdt:P13 ?available.
# FILTER(?host != wd:Q4)
# FILTER(?host != wd:Q6)
# FILTER(?host != wd:Q7)
# FILTER(?host != wd:Q117)
# FILTER(?host != wd:Q8)
OPTIONAL { ?item wdt:P7 ?sparqlUIUrl. }
OPTIONAL { ?item wdt:P8 ?sparqlEndpointUrl. }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}"""
async def pull_wikibase_world():
"""Pull Wikibase World Data"""
# data = get_results(
# endpoint_url="https://wikibase.world/query/sparql",
# query=WIKIBASES_QUERY,
# query_name="Pull Wikibases",
# )
# print(data)
with open(
"./data/wikibase_world_data.json", mode="r", encoding="utf-8"
) as data_file:
data = json.loads(data_file.read())
async with get_async_session() as async_session:
for record in data:
existing = (
await async_session.scalars(
select(WikibaseModel).where(
or_(
WikibaseModel.wikibase_name == record.get("itemLabel"),
WikibaseModel.url.has(
WikibaseURLModel.url == record.get("url")
),
)
)
)
).all()
if len(existing) == 0:
print(record.get("itemLabel"))
async_session.add(
WikibaseModel(
wikibase_name=record.get("itemLabel"),
base_url=record.get("url"),
sparql_query_url=record.get("sparqlUIUrl"),
sparql_endpoint_url=record.get("sparqlEndpointUrl"),
)
)
await async_session.commit()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [loop.create_task(pull_wikibase_world())]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()