-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
123 lines (112 loc) · 4.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
""" Utilities for Computational Assyriology """
import requests
from tqdm.auto import tqdm
import os
import errno
import zipfile
import json
import pandas as pd
lemm_l = []
meta_d = {"label": None, "id_text": None}
dollar_keys = ["extent", "scope", "state"]
def make_dirs(x):
"""Check for existence of directories
create those if they do not exist
otherwise do nothing. Parameter is a list
with directory names"""
for dir in x:
try:
os.mkdir(dir)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
pass
def format_project_list(x):
p = x.lower().strip().split(',')
p = [x.strip() for x in p]
return(p)
def oracc_download(p):
"""Downloads ZIP with JSON files from
ORACC server. Parameter is a list
with ORACC project names,
return is the same list of names,
minus doublets and non-existing
projects"""
CHUNK = 16 * 1024
p = list(set(p)) #remove duplicates
projects = p.copy()
for project in p:
proj = project.replace('/', '-')
url = "http://oracc.museum.upenn.edu/" + project + "/json/" + proj + ".zip"
file = 'jsonzip/' + proj + '.zip'
with requests.get(url, stream=True) as r:
if r.status_code == 200:
tqdm.write("Saving " + url + " as " + file)
with open(file, 'wb') as f:
for c in tqdm(r.iter_content(chunk_size=CHUNK), desc = project):
f.write(c)
else:
tqdm.write(url + " does not exist.")
projects.remove(project)
return projects
def parsejson(text):
for JSONobject in text["cdl"]:
if "cdl" in JSONobject:
parsejson(JSONobject)
if "label" in JSONobject:
meta_d["label"] = JSONobject['label']
if "type" in JSONobject and JSONobject["type"] == "field-start": # this is for sign lists, identifying fields such as
meta_d["field"] = JSONobject["subtype"] # sign, pronunciation, translation.
if "type" in JSONobject and JSONobject["type"] == "field-end":
meta_d.pop("field", None) # remove the key "field" to prevent it from being copied
# to all subsequent lemmas (which may not have fields)
if "f" in JSONobject:
lemma = JSONobject["f"]
lemma["id_word"] = JSONobject["ref"]
lemma['label'] = meta_d["label"]
lemma["id_text"] = meta_d["id_text"]
if "field" in meta_d:
lemma["field"] = meta_d["field"]
lemm_l.append(lemma)
if "strict" in JSONobject and JSONobject["strict"] == "1":
lemma = {key: JSONobject[key] for key in dollar_keys}
lemma["id_word"] = JSONobject["ref"]
lemma["id_text"] = meta_d["id_text"]
lemm_l.append(lemma)
return
def get_lemmas(p):
for project in p:
file = "jsonzip/" + project.replace("/", "-") + ".zip"
try:
z = zipfile.ZipFile(file)
except:
print(file + " does not exist or is not a proper ZIP file")
continue
files = z.namelist()
files = [name for name in files if "corpusjson" in name and name[-5:] == '.json']
for filename in tqdm(files, desc = project):
id_text = project + filename[-13:-5]
meta_d["id_text"] = id_text
try:
st = z.read(filename).decode('utf-8')
data_json = json.loads(st)
parsejson(data_json)
except:
print(id_text + ' is not available or not complete')
z.close()
return(lemm_l)
def dataformat(lemm_list):
words_df = pd.DataFrame(lemm_list).fillna('')
findreplace = {' ' : '-', ',' : ''}
words_df = words_df.replace({'gw' : findreplace, 'sense' : findreplace}, regex=True)
words_df['id_line'] = [int(wordid.split('.')[1]) for wordid in words_df['id_word']]
return(words_df)
def get_data(x):
make_dirs(["jsonzip", "output"])
p = format_project_list(x)
print("Downloading JSON")
p = oracc_download(p)
print("Parsing JSON")
lemm_list = get_lemmas(p)
words_df = dataformat(lemm_list)
return(words_df)