-
Notifications
You must be signed in to change notification settings - Fork 5
/
inat_taxonomy.py
318 lines (285 loc) · 12.3 KB
/
inat_taxonomy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import csv, sys, os, time, locale, zipfile, io
import inat_api
from dataclasses import dataclass
from typing import List, Dict
# The directory where this Python script is located.
INSTALL_DIR = os.path.dirname(__file__)
while os.path.islink(INSTALL_DIR):
INSTALL_DIR = os.path.join(INSTALL_DIR,
os.path.dirname(os.readlink(INSTALL_DIR)))
# This zip file contains the taxonomy and all common names.
# Download https://www.inaturalist.org/taxa/inaturalist-taxonomy.dwca.zip and
# leave this zip file in directory 'inaturalist-taxonomy'. Do not extract the
# files from this zip archive.
INAT_TAXONOMY = os.path.join(INSTALL_DIR, 'inaturalist-taxonomy',
'inaturalist-taxonomy.dwca.zip')
# A special node represents the root of the tree, the parent of kingdoms.
ROOT_TAXON_ID = 48460
ROOT_NAME = 'Life'
ROOT_RANK_LEVEL = 100
# maps rank-level to its name
gRankLevel2Name = {
ROOT_RANK_LEVEL : 'stateofmatter', # used for the parent of kingdoms
70 : 'kingdom',
67 : 'subkingdom',
60 : 'phylum',
57 : 'subphylum',
53 : 'superclass',
50 : 'class',
47 : 'subclass',
45 : 'infraclass',
44 : 'subterclass',
43 : 'superorder',
40 : 'order',
37 : 'suborder',
35 : 'infraorder',
34.5: 'parvorder',
34 : 'zoosection',
33.5: 'zoosubsection',
33 : 'superfamily',
32 : 'epifamily',
30 : 'family',
27 : 'subfamily',
26 : 'supertribe',
25 : 'tribe',
24 : 'subtribe',
20 : 'genus',
19 : 'genushybrid', # changed, was same as genus in iNaturalist
15 : 'subgenus',
13 : 'section',
12 : 'subsection',
11 : 'complex',
10 : 'species',
9 : 'hybrid', # changed, was same as species in iNaturalist
5 : 'subspecies',
4 : 'variety', # changed, was same as subspecies in iNaturalist
3 : 'form', # changed, was same as subspecies in iNaturalist
2 : 'infrahybrid' # changed, was same as subspecies in iNaturalist
}
# maps rank name to numeric rank-level
gName2RankLevel = {}
for key, value in gRankLevel2Name.items():
gName2RankLevel[value] = key
KINGDOM_RANK_LEVEL = gName2RankLevel['kingdom']
def get_rank_level(rank):
assert rank in gName2RankLevel
return gName2RankLevel[rank]
def get_rank_name(rank_level, default_name = 'clade'):
return gRankLevel2Name[rank_level] if rank_level in gRankLevel2Name \
else default_name
@dataclass(frozen=True)
class Taxon:
id : int
parent_id : int
name : str
rank_level: float
# iNaturalist taxa, only loaded when a taxonomic tree needs
# to be computed from a label file.
gName2Taxa: Dict[str,List[Taxon]] = {}
"maps taxon name to list of taxa"
gId2Taxon: Dict[int,Taxon] = {}
"maps taxon id to taxon"
def load_inat_taxonomy():
"Load all iNaturalist taxa from file 'taxa.csv'."
global gName2Taxa
global gId2Taxon
if gName2Taxa and gId2Taxon:
return True # already loaded
print('Loading iNaturalist taxonomy...')
start_time = time.time()
gName2Taxa = {}
gId2Taxon = {}
try:
with zipfile.ZipFile(INAT_TAXONOMY, 'r') as zf:
with zf.open('taxa.csv', 'r') as zfile:
with io.TextIOWrapper(zfile, encoding = 'latin-1') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
id = int(row['id'])
parent_id = row['parentNameUsageID'].split('/')[-1]
parent_id = int(parent_id) if parent_id else \
ROOT_TAXON_ID if id != ROOT_TAXON_ID else None
name = row['scientificName']
rank = row['taxonRank']
if not rank in gName2RankLevel:
response = inat_api.get_taxa_by_id(id)
if response and 'results' in response:
rank_level = response['results'][0]\
['rank_level']
gName2RankLevel[rank] = rank_level
if not rank_level in gRankLevel2Name:
gRankLevel2Name[rank_level] = rank
print(f"Please add rank '{rank}' to gName2Rank"
f"Level, numeric value {rank_level}.")
else:
gName2RankLevel[rank] = -1
rank_level = gName2RankLevel[rank]
inat_taxon = Taxon(id, parent_id, name, rank_level)
if name in gName2Taxa:
gName2Taxa[name].append(inat_taxon)
else:
gName2Taxa[name] = [inat_taxon]
assert not id in gId2Taxon
gId2Taxon[id] = inat_taxon
if len(gId2Taxon) % 10000 == 0:
print(f' {len(gId2Taxon):,} ' if len(gId2Taxon) %
100000 == 0 else '.', end='')
sys.stdout.flush()
assert ROOT_TAXON_ID in gId2Taxon
print(f' {len(gId2Taxon):,}.')
print(f'Loaded iNaturalist taxonomy of {len(gId2Taxon):,} taxa '
f'in {time.time()-start_time:.1f} secs.')
return True
except Exception as e:
print("Cannot load taxonomy 'taxa.csv' from archive "
f"'{INAT_TAXONOMY}': {str(e)}.")
gName2Taxa = {}
gId2Taxon = {}
return False
def beautify_common_name(name):
"Capitalize (most) words in common name; helper function for common names."
if name.endswith(' [paraphyletic]'):
name = name[:-15] # fix dicots
name = '-'.join(word[0].upper() + word[1:]
for word in name.split('-'))
return ' '.join(word if word == 'and' or word.endswith('.')
else word[0].upper() + word[1:]
for word in name.split())
def annotate_common_names(id2taxon, all_common_names = False):
"""
Load the common names in our language, annotate taxonomic tree with them.
The parameter `id2taxon' includes the taxa we are interested in.
"""
start_time = time.time()
language, _ = locale.getdefaultlocale()
if language in ['C', 'C.UTF-8', 'POSIX']:
language = 'en'
if not os.path.isfile(INAT_TAXONOMY):
print("Cannot load common names, archive "
f"'{INAT_TAXONOMY}' does not exist.")
return
try:
with zipfile.ZipFile(INAT_TAXONOMY, 'r') as zf:
perfect_match = []
other_matches = []
# check all common names files for names in our language
for fname in zf.namelist():
if fname.startswith("VernacularNames-") and \
fname.endswith(".csv"):
with zf.open(fname, 'r') as zfile:
with io.TextIOWrapper(zfile, encoding='utf-8') as csvf:
reader = csv.DictReader(csvf)
for row in reader:
lang = row['language']
if lang == language:
perfect_match.append(fname) # en vs en
elif len(lang) < len(language) and \
lang == language[:len(lang)]:
other_matches.append(fname) # en vs en_US
break
if not perfect_match and not other_matches:
print("Cannot find common names for language '{language}'.")
return
# annotate the taxa with common names
total_names = loaded_names = 0
for fname in perfect_match + other_matches:
print(f"Reading common names from '{INAT_TAXONOMY}' "
f"member '{fname}'...")
with zf.open(fname, 'r') as zfile:
with io.TextIOWrapper(zfile, encoding='utf-8') as csvf:
reader = csv.DictReader(csvf)
for row in reader:
total_names += 1
id = int(row['id'])
if id in id2taxon and (all_common_names or \
id2taxon[id].common_name is None):
loaded_names += 1
cname = beautify_common_name(row['vernacular'
'Name'])
if id2taxon[id].common_name is None:
id2taxon[id].common_name = cname
else:
id2taxon[id].common_name += '; ' + cname
print(f'Read {total_names:,} common names in '
f'{time.time()-start_time:.1f} secs, loaded {loaded_names:,} '
f'in language "{language}" for {len(id2taxon)-1:,} taxa.')
except Exception as e:
print(f"Cannot load common names from archive '{INAT_TAXONOMY}':"
f" {str(e)}.")
def get_ancestors(id, ancestors):
"""
Ancestors are a list of instances of Taxon; they are ordered from the
kingdom down.
"""
taxon = gId2Taxon[id]
if taxon.rank_level < KINGDOM_RANK_LEVEL:
get_ancestors(taxon.parent_id, ancestors)
ancestors.append(taxon)
def lookup_id(name, desired_ranks = ['species', 'subspecies']):
"""
Lookup by name, returns a pair, a Taxon and its ancestors, a list of
Taxon. Desired_ranks are returned in case of ambiguities (duplicate names).
"""
if not gName2Taxa:
return None # taxonomy not loaded
if name in gName2Taxa:
taxa = gName2Taxa[name]
if len(taxa) > 1:
species = None
subspecies = None
print(f"Warning: multiple taxa named '{name}':", end='')
prefix = ' '
taxon = None
for t in taxa:
rank = get_rank_name(t.rank_level)
print(f"{prefix}{rank} {t.id}", end='')
if rank in desired_ranks:
taxon = t
prefix = ', '
if not taxon:
taxon = taxa[0]
rank = get_rank_name(taxon.rank_level)
print(f"; choosing {rank}.")
else:
taxon = taxa[0]
ancestors = []
if taxon.rank_level < KINGDOM_RANK_LEVEL:
get_ancestors(taxon.parent_id, ancestors)
return (taxon, ancestors)
else:
# likely taxon change, query iNat API
response = inat_api.get_taxa({ 'q' : name,
'all_names' : 'true',
'per_page' : 200 })
if not response:
print(f"API lookup for name '{name}' failed.")
return
taxa = response['results']
if len(taxa) > 1:
# more than one taxon, find the one that used to have this name
exact_matches = [taxon for taxon in taxa for nam in taxon['names']
if nam['locale'] == 'sci' and nam['name'] == name]
if exact_matches:
taxa = exact_matches
ids = [taxon['id'] for taxon in taxa]
taxa = set([gId2Taxon[id] for id in ids if id in gId2Taxon])
if not taxa:
return
while len(taxa) > 1:
# multiple taxa, find their common ancestor
min_rank_level = min([taxon.rank_level for taxon in taxa])
new_taxa = set()
for taxon in taxa:
new_taxon = gId2Taxon[taxon.parent_id] \
if taxon.rank_level == min_rank_level \
else taxon
if not new_taxon in new_taxa:
new_taxa.add(new_taxon)
taxa = new_taxa
taxon = taxa.pop()
ancestors = []
if taxon.rank_level < KINGDOM_RANK_LEVEL:
get_ancestors(taxon.parent_id, ancestors)
return (taxon, ancestors)
if __name__ == '__main__':
assert not 'Not a top-level Python module!'