-
Notifications
You must be signed in to change notification settings - Fork 3
/
pa_request.py
597 lines (505 loc) · 23.8 KB
/
pa_request.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
# This file is part of the NIME Proceedings Analyzer (NIME PA)
# Copyright (C) 2024 Jackson Goode, Stefano Fasciani
# The NIME PA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# The NIME PA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# If you use the NIME Proceedings Analyzer or any part of it in any program or
# publication, please acknowledge its authors by adding a reference to:
# J. Goode, S. Fasciani, A Toolkit for the Analysis of the NIME Proceedings
# Archive, in 2022 International Conference on New Interfaces for
# Musical Expression, Auckland, New Zealand, 2022.
import datetime
import itertools
import os
import random
import re
import time
import orjson
import requests
import unidecode
from dotenv import find_dotenv, load_dotenv
from opencage.geocoder import OpenCageGeocode
from tqdm import tqdm
import pa_print
from pa_utils import try_index
load_dotenv(find_dotenv())
geocoder = OpenCageGeocode("c55bcffbb38246aab6e54c136a5fac75")
email_regex = re.compile(r"@[a-zA-Z0-9-–]+\.[a-zA-Z0-9-–.]+")
def scholar_api_paper_search(query, key, sleep, wait):
api = "https://api.semanticscholar.org/graph/v1/paper/search?query="
fields = "&fields=authors,title,year,citationCount,influentialCitationCount"
query_result = {'message': 'init'}
while ("message" in query_result.keys() and wait):
if key != "":
api_key = "&x-api-key=" + key
query_result = requests.get(api + query + fields + api_key).json()
time.sleep(sleep)
else:
query_result = requests.get(api + query + fields).json()
time.sleep(sleep)
return query_result
def scholar_api_paper_manual_lookup(paper_id, key, sleep, wait):
api = "https://api.semanticscholar.org/graph/v1/paper/"
fields = "?fields=authors,title,year,citationCount,influentialCitationCount"
query_result = {'message': 'init'}
while ("message" in query_result.keys() and wait):
if key != "":
api_key = "&x-api-key=" + key
query_result = requests.get(api + paper_id + fields + api_key).json()
time.sleep(sleep)
else:
query_result = requests.get(api + paper_id + fields).json()
time.sleep(sleep)
return query_result
def scholar_api_paper_citref_lookup(paper_id, key, sleep, wait):
api = "https://api.semanticscholar.org/graph/v1/paper/"
cit = "citations.authors,citations.title,citations.year,citations.s2FieldsOfStudy,citations.publicationTypes,citations.journal,citations.publicationVenue"
ref = "references.authors,references.title,references.year,references.s2FieldsOfStudy,references.publicationTypes,references.journal,references.publicationVenue"
fields = (
"?fields=title,authors,paperId,embedding,s2FieldsOfStudy,publicationTypes,publicationVenue,tldr,"
+ cit
+ ","
+ ref
)
query_result = {'message': 'init'}
while ("message" in query_result.keys() and wait):
if key != "":
api_key = "&x-api-key=" + key
query_result = requests.get(api + paper_id + fields + api_key).json()
time.sleep(sleep)
else:
query_result = requests.get(api + paper_id + fields).json()
time.sleep(sleep)
return query_result
def request_scholar(pub, args):
"""Queries citations from Semantic Scholar
:publication from bibtex file
"""
try:
with open("./cache/json/scholar_cache.json", "rb") as fp:
scholar_cache = orjson.loads(fp.read())
except FileNotFoundError:
pa_print.tprint("\nCreating new Semantic Scholar cache!")
scholar_cache = {}
# Fix names for searching
regextitle = re.compile(r"[^a-zA-Z0-9 ]")
regexname = re.compile(r"[^a-zA-Z- ]")
author_last_list = []
for _, (_, last) in enumerate(pub["author names"]):
last = last.split("-")[-1]
author_last_list.append(last)
title = unidecode.unidecode(pub["title"])
for f, v in [
(
[
"authors id",
"field of study",
"publication type",
"references",
"citations",
],
[],
),
(
[
"query",
"citation count",
"influential citation count",
"reference count",
"paper id",
"title",
],
"N/A",
),
(
[
"publication venue",
"embedding",
"tldr",
],
{}
),
]:
for suffix in f:
pub[f"scholar {suffix}"] = v
pub["scholar valid"] = False
skip = False
if args.nime:
# papers (mostly installations) with short titles returning wrong entries in Semantic Scholar, or with significantly wrong citation
to_skip = ["Deutscher2005", "Biggs2007", "Court2007", "Sa2007", "Stanza2007", "Stark2007", "Dubois2009", "Overholt2009", "Wechsler2009", "Michon2013", "Paine2016"]
if pub["ID"] in to_skip:
pa_print.tprint("Skipping papers due to reported wrong data in Semantic Scholar or ambiguous query")
skip = True
# Make query title, name and year lists
query_title = list(
dict.fromkeys(
[
title,
regextitle.sub("", title),
" ".join([w for w in title.split() if len(w) > 1]),
]
)
)
if len(author_last_list) > 1:
query_name = [" ".join(author_last_list), author_last_list[0], ""]
else:
query_name = [author_last_list[0], ""]
query_year = ["", pub["year"]]
# Save query to be used for cache
full_query = f"{title} {' '.join(author_last_list)} {pub['year']}"
pub["scholar query"] = full_query
# Set key either through arg or .env
sskey = args.sskey or os.getenv("SSKEY") or ""
if full_query not in scholar_cache:
pa_print.tprint(f"\nQuerying Semantic Scholar...")
last_iter = False
force = False
lookup_result = {}
queries = list(itertools.product(query_title, query_name, query_year))
if args.manual:
queries.append(queries[0])
num_of_queries = len(queries)
for idx, temp in enumerate(queries):
if args.nime:
if skip:
break
# Checking if last query
if idx == (num_of_queries - 1):
last_iter = True
# Generate new query from combination
temp_title, temp_author, temp_year = temp[0], temp[1], temp[2]
scholar_query = f"{temp_title} {temp_author} {temp_year}"
# Ask paper ID manually and retrieve title-author for search
if args.manual and last_iter:
manual_id = input("Type the Semantic Scholar search string manually and press Enter, or press Enter to skip ")
if len(manual_id)>0:
lookup_result = scholar_api_paper_manual_lookup(
manual_id, sskey, args.sleep, args.wait
)
if "message" in lookup_result:
temp = query_result["message"]
pa_print.tprint(f"Semantic Scholar lookup returned message: {temp}")
else:
query_result = {"data": [lookup_result]}
force = True
else:
# Try query
pa_print.tprint(f"Trying query: '{scholar_query}'")
query_result = scholar_api_paper_search(
scholar_query, sskey, args.sleep, args.wait
)
if "message" in query_result:
temp = query_result["message"]
pa_print.tprint(f"Semantic Scholar search returned message: {temp}")
if (
not "message" in query_result.keys()
and not "error" in query_result.keys()
and "data" in query_result.keys()
and len(query_result["data"]) > 0
):
if ("citationCount" in query_result["data"][0] or last_iter) and (len(
query_result["data"][0]["authors"]
) <= (len(author_last_list) + 1) or force):
result_author = " ".join(
[t["name"] for t in query_result["data"][0]["authors"]]
)
result_author = regexname.sub(
"", unidecode.unidecode(result_author)
).lower()
query_author = regexname.sub(
"", author_last_list[0].lower().split(" ")[-1]
)
if force:
if query_result["data"][0]["paperId"] != manual_id:
try:
temp1 = manual_id
temp2 = query_result["data"][0]["paperId"]
pa_print.tprint(f"Paper ID mismatch, provided:{temp1} vs retrieved:{temp2}") # if this ever gets printed, improve the algorithm
except:
pass
break
if (result_author.find(query_author) != -1 or force):
# if paper never cited, creating citation fields and setting to 0
if "citationCount" not in query_result["data"][0]:
query_result["data"][0]["citationCount"] = 0
query_result["data"][0]["influentialCitationCount"] = 0
pub["scholar citation count"] = query_result["data"][0][
"citationCount"
]
pub["scholar influential citation count"] = query_result[
"data"
][0]["influentialCitationCount"]
pub["scholar paper id"] = query_result["data"][0]["paperId"]
pub["scholar title"] = query_result["data"][0]["title"]
pub["scholar authors id"] = [
t["authorId"] for t in query_result["data"][0]["authors"]
]
scholar_cache[full_query] = query_result
if pub["scholar paper id"] not in scholar_cache:
pa_print.tprint(f"\nSemantic Scholar paper lookup...")
lookup_result = scholar_api_paper_citref_lookup(
pub["scholar paper id"], sskey, args.sleep, args.wait
)
if "message" not in lookup_result:
scholar_cache[pub["scholar paper id"]] = lookup_result
else:
lookup_result = scholar_cache[pub["scholar paper id"]]
pa_print.tprint(
f"✓ - Paper has been cited {pub['scholar citation count']} times"
)
break
if pub["scholar citation count"] == "N/A":
pa_print.tprint("x - Cannot find paper in Semantic Scholar")
# scholar_cache[full_query] = 'N/A'
else:
lookup_result = {}
if scholar_cache[full_query] != "N/A" and skip==False:
pub["scholar citation count"] = scholar_cache[full_query]["data"][0][ "citationCount"]
pub["scholar influential citation count"] = scholar_cache[full_query]["data"][0]["influentialCitationCount"]
pub["scholar paper id"] = scholar_cache[full_query]["data"][0]["paperId"]
pub["scholar title"] = scholar_cache[full_query]["data"][0]["title"]
pub["scholar authors id"] = [
t["authorId"] for t in scholar_cache[full_query]["data"][0]["authors"]
]
if pub["scholar paper id"] not in scholar_cache:
pa_print.tprint(f"\nSemantic Scholar paper lookup...")
lookup_result = scholar_api_paper_citref_lookup(
pub["scholar paper id"], sskey, args.sleep, args.wait
)
scholar_cache[pub["scholar paper id"]] = lookup_result
else:
lookup_result = scholar_cache[pub["scholar paper id"]]
if "embedding" in scholar_cache[pub["scholar paper id"]]:
pub["scholar embedding"] = scholar_cache[pub["scholar paper id"]]["embedding"]
if "tldr" in scholar_cache[pub["scholar paper id"]]:
pub["scholar tldr"] = scholar_cache[pub["scholar paper id"]]["tldr"]
if "citations" in scholar_cache[pub["scholar paper id"]]:
pub["scholar citations"] = scholar_cache[pub["scholar paper id"]]["citations"]
if "references" in scholar_cache[pub["scholar paper id"]]:
pub["scholar references"] = scholar_cache[pub["scholar paper id"]]["references"]
pub["scholar reference count"] = len(scholar_cache[pub["scholar paper id"]]["references"])
if pub["scholar reference count"] > 0:
pub["scholar valid"] = True
pa_print.tprint(
f"\no - Retrieved from cache: {pub['scholar citation count']} citations"
)
if lookup_result:
if "embedding" in lookup_result:
pub["scholar embedding"] = lookup_result["embedding"]
if "tldr" in lookup_result:
pub["scholar tldr"] = lookup_result["tldr"]
if "s2FieldsOfStudy" in lookup_result:
pub["scholar field of study"] = lookup_result["s2FieldsOfStudy"]
if "publicationTypes" in lookup_result:
pub["scholar publication venue"] = lookup_result["publicationVenue"]
if "publicationTypes" in lookup_result:
pub["scholar publication type"] = lookup_result["publicationTypes"]
if "citations" in lookup_result:
pub["scholar citations"] = lookup_result["citations"]
if "references" in lookup_result:
pub["scholar references"] = lookup_result["references"]
pub["scholar reference count"] = len(lookup_result["references"])
if pub["scholar reference count"] > 0:
pub["scholar valid"] = True
# Average citations per year of age
if pub["scholar citation count"] != "N/A":
if pub["age"] == 0:
pub["age"] = 1
pub["scholar yearly citations"] = (
int(pub["scholar citation count"]) / pub["age"]
)
else:
pub["scholar yearly citations"] = "N/A"
# Resetting fields to defaults if null
if not pub["scholar authors id"]:
pub["scholar authors id"] = []
if not pub["scholar field of study"]:
pub["scholar field of study"] = []
if not pub["scholar publication type"]:
pub["scholar publication type"] = []
if not pub["scholar references"]:
pub["scholar references"] = []
if not pub["scholar citations"]:
pub["scholar citations"] =[]
if not pub["scholar publication venue"]:
pub["scholar publication venue"] ={}
if not pub["scholar embedding"]:
pub["scholar embedding"] ={}
if not pub["scholar tldr"]:
pub["scholar tldr"] ={}
with open("./cache/json/scholar_cache.json", "wb") as fp:
fp.write(orjson.dumps(scholar_cache))
def request_location(author_info, args, pub):
"""Extracts location from author blocks or universities and queries OpenCageGeocode
:publication from bibtex file
"""
author_count = pub["author count"]
# Conference location lookup
cnf_query = pub["address"]
query_type = "conference"
query_location(cnf_query, query_type, pub, args) # *** creates unneeded columns ***
# Author location lookup
for author in range(author_count): # length of usable locations
query_type = "author"
# Assign one query (in order of priority)
# 1) If there is a university address from grobid
if pub["grobid author unis"][author] != "N/A": # uni address
location_query = ", ".join(
pub["grobid author unis"][author]
) # (uni name, country)
query_origin = "grobid uni"
# 2) If grobid was used to add address (while 'location' is api derived)
elif pub["grobid addresses"][author] != "N/A":
location_query = pub["grobid addresses"][author]
query_origin = "grobid address"
# 3) If theres a uni address from text block
elif pub["text author unis"][author] != "N/A":
location_query = ", ".join(
pub["text author unis"][author]
) # (uni name, country)
query_origin = "text uni"
# 4) Else, scrape from raw author block (which may or may not have email)
elif (
author < len(author_info) and author_info[author] != "N/A"
): # check if author_info contains author 'i' and is non-empty
auth_block = author_info[author]
cut_line = -1 if "@" in auth_block else 0 # one line above if email present
info_lines = auth_block.split("\n")
location_query = " ".join(info_lines[cut_line - 1 : cut_line])
if (
len([line for line in location_query if line.isdigit()]) > 8
): # look for tele #
location_query = " ".join(
info_lines[cut_line - 2 : cut_line - 1]
) # take line higher if telephone
query_origin = "raw author block"
else:
location_query = "N/A"
query_origin = "No query"
pa_print.tprint("\nCouldn't find a location to use!")
pa_print.tprint(f"\nLooking for: {location_query}")
pub["author loc queries"].append(location_query)
pub["author query origins"].append(query_origin)
query_location(location_query, query_type, pub, args)
def query_location(
location_query, query_type, pub, args
): # 'query_type is now only used to print status
# Load cache
try:
with open("./cache/json/location_cache.json", "rb") as fp:
location_cache = orjson.loads(fp.read())
except FileNotFoundError:
pa_print.tprint("\nCreating new location cache!")
location_cache = {"N/A": "N/A"}
# Not cached
if location_query not in location_cache:
try:
geocoder = OpenCageGeocode(args.ockey or os.getenv("OCKEY"))
if geocoder is None:
raise Exception("Error: OpenCage API key not provided")
# OpenCageGeocode: 2,500 req/day, 1 req/s - https://github.com/OpenCageData/python-opencage-geocoder
location = geocoder.geocode(
location_query, language="en", limit=1, no_annotations=1, no_record=1
)[0]
# Format result
geometry = location["geometry"] # lat/long
components = location["components"] # fine loc info
location_info = (
location["formatted"],
(components["country"], components["continent"]),
(geometry["lat"], geometry["lng"]),
location["confidence"],
) # 1 (>25km) to 10 (<0.25km)
location_cache[location_query] = location_info
pub[f"{query_type} location info"].append(
location_info[:3]
) # add all location into one column
pub[f"{query_type} location confidence"].append(
location_info[3]
) # confidence in separate column
pa_print.tprint(f"✓ - Parsed {query_type} location: {location_info[0]}")
time.sleep(1 + random.random())
except Exception as e: # API fails
# location_cache[location_query] = 'N/A'
pub[f"{query_type} location info"].append("N/A")
pub[f"{query_type} location confidence"].append("N/A")
err_info = "x - Could not parse {0} location: {1}, while querying Open Cage Data an exception of type {2} occurred.\nArguments:\n{3!r}."
err_msg = err_info.format(
query_type, location_query, type(e).__name__, e.args
)
pa_print.tprint(err_msg)
# Save changes to cache
with open("./cache/json/location_cache.json", "wb") as fp:
fp.write(orjson.dumps(location_cache))
# Cached
else:
if location_cache[location_query] != "N/A" and not (location_query == "N/A"):
location_info = location_cache[location_query]
pub[f"{query_type} location info"].append(location_info[:3])
pub[f"{query_type} location confidence"].append(location_info[3])
pa_print.tprint(f"o - Cached {query_type} location: {location_info[0]}")
else:
location_info = "N/A"
pub[f"{query_type} location info"].append("N/A")
pub[f"{query_type} location confidence"].append("N/A")
pa_print.tprint(f"o - Null {query_type} location: {location_info}")
def request_uni(unidomains, author_info, args, pub):
"""Extract university from email handle
:publication from bibtex file
"""
pub_matches = 0
grob_matches = 0
text_matches = 0
author_count = pub["author count"]
# Internal functions for lookup in unidomains.json
def lookup_uni(handle, email_type, pub):
nonlocal pub_matches
for uni in unidomains:
if handle in uni["domains"]:
pub[f"{email_type} author unis"].append((uni["name"], uni["country"]))
pub_matches += 1
uni_match = True
break
def handle_check(email, email_type, pub):
handle = email.split("@")[-1].strip()
# Look for handle in json, split once by dot and retry if not found
uni_match = False
lookup_uni(handle, email_type, pub)
while uni_match == False and handle.count(".") > 1:
handle = handle.split(".", 1)[-1]
lookup_uni(handle, email_type, pub)
# 1) Using grobid derived emails to choose handle
email_type = "grobid"
for author in range(author_count):
email = pub["grobid emails"][author]
if email != "N/A": # check for valid email
handle_check(email, email_type, pub)
grob_matches = pub_matches
# 2) Using scraped author info block from header if not enough emails
if len(author_info) > 0 and (grob_matches < author_count):
email_type = "text"
for author in author_info: # ! could be more authors than exit
info_emails = email_regex.findall(author) # look for '@handle.tld' in block
for _, email in enumerate(
info_emails
): # case: multiple emails are within an author block #! (will overwrite)
if email != "N/A":
handle_check(email, email_type, pub)
# Fill in missing unis with 'N/A' # ! author block not linked in order with authors
for type, author in [
(type, author) for type in ["grobid", "text"] for author in range(author_count)
]:
try:
pub[f"{type} author unis"][author]
except IndexError:
pub[f"{type} author unis"].append("N/A")
text_matches = pub_matches - grob_matches
pub_matches = max(text_matches, grob_matches)
pa_print.tprint(f"o - Found {pub_matches} uni's from email handles\n")