-
Notifications
You must be signed in to change notification settings - Fork 4
/
land_cover_utils.py
318 lines (288 loc) · 12 KB
/
land_cover_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
'''
utils.py
Author: Lucas Hu
SEN12MS Land-Cover util functions
'''
import os
import numpy as np
from sklearn.preprocessing import LabelEncoder
from skimage.util.shape import view_as_blocks
from scipy import stats
from sen12ms_dataLoader import SEN12MSDataset, \
Seasons, Sensor, S1Bands, S2Bands, LCBands
def json_keys_to_int(x):
'''
Helper function to parse JSON with ints as keys
'''
try:
return {int(k):v for k,v in x.items()}
except:
return x
def make_history_json_serializable(history):
'''
Make keras history.history object JSON serializable
'''
new_history = {}
for k, v in history.items():
if isinstance(v, list):
new_history[k] = [float(x) for x in v]
return new_history
def get_label_encoder(config):
'''
Uses config_dict's landuse_class info to get an sklearn label_encoder
Output: sklearn label_encoder
'''
labels = config['training_params']['label_scheme']
# get remaining classes after merging
if labels == 'landuse':
merged_classes = set(config['landuse_class_mappings'].keys())
all_classes = set(config['landuse_class_descriptions'].keys())
remaining_classes = all_classes - merged_classes
elif labels == 'dfc':
removed_classes = set(config['dfc_removed_classes']) | set(config['dfc_ignored_classes'])
all_classes = set(config['dfc_class_descriptions'].keys())
remaining_classes = all_classes - removed_classes
else:
print('get_label_encoder: unknown labels parameter!')
return None
# sort class_nums
class_nums_sorted = sorted(list(remaining_classes))
# if we're masking some classes, reserve '0' index
if config[f'{labels}_ignored_classes'] is not None and \
len(config[f'{labels}_ignored_classes']) > 0:
class_nums_sorted = [0] + class_nums_sorted
# get label_encoder
label_encoder = LabelEncoder()
label_encoder.classes_ = np.array(class_nums_sorted)
return label_encoder
def get_continents_label_encoder(config):
'''
Uses config_dict's all_continents info to get an sklearn label_encoder
Output: sklearn label_encoder
'''
all_continents = config['all_continents']
continents_label_encoder = LabelEncoder()
continents_label_encoder.classes_ = np.array(all_continents)
return continents_label_encoder
def get_seasons_label_encoder(config):
'''
Uses config_dict's all_seasons info to get an sklearn label_encoder
Output: sklearn label_encoder
'''
all_seasons = config['all_seasons']
seasons_label_encoder = LabelEncoder()
seasons_label_encoder.classes_ = np.array(all_seasons)
return seasons_label_encoder
def patch_to_subpatches(patch, config):
'''
Input: single patch: B, W, H
Output: N, B, subpatch_size, subpatch_size
'''
subpatch_size = config['training_params']['subpatch_size']
subpatches = view_as_blocks(patch, \
block_shape=(subpatch_size, subpatch_size, patch.shape[-1]))
subpatches = np.squeeze(subpatches)
subpatches = np.concatenate(subpatches, axis=0)
return subpatches
def scene_to_subpatches(patches, config, patch_ids=None):
'''
Split square patches into smaller squre sub-patches
Input: patches of shape D, B, W, H
Output: patches of shape N, B, subpatch_size, subpatch_size
N = D * (W / subpatch_size)
'''
subpatch_size = config['training_params']['subpatch_size']
all_subpatches = [] # list of each patch's subpatch array
all_patch_ids = []
for i, patch in enumerate(patches):
subpatches = patch_to_subpatches(patch, config)
all_subpatches.append(subpatches)
if patch_ids is not None:
all_patch_ids.extend([patch_ids[i]] * len(subpatches))
# concat all subpatches
all_subpatches = np.concatenate(all_subpatches, axis=0)
if patch_ids is not None:
return all_subpatches, patch_ids
else:
return all_subpatches
def combine_landuse_classes(landuse, config):
'''
Input: land use patches (Shape: D, W, H), config
Output: land use patches with combined classes (see Section 5 of SEN12MS paper)
'''
landuse_class_mappings = config['landuse_class_mappings']
for from_class, to_class in landuse_class_mappings.items():
landuse = np.where(landuse==from_class, to_class, landuse)
return landuse
def get_landuse_labels(lc, config):
'''
Input: lc (land cover bands, Shape: D, W, H, B=4), config
Output: majority LCCS land-use class, Shape: D
'''
land_use_patches = lc[:, :, :, LCBands.landuse.value-1]
land_use_patches = combine_landuse_classes(land_use_patches, config)
land_use_flattened = land_use_patches.reshape(land_use_patches.shape[0], -1)
modes, counts = stats.mode(land_use_flattened, axis=1)
return np.ravel(modes)
def get_represented_landuse_classes_from_onehot_labels(y, label_encoder):
'''
Input: y (one-hot labels, 2D array), label_encoder
Output: list of landuse class numbers that do appear in y
'''
labels = np.argmax(y, axis=1)
landuse_classes = label_encoder.inverse_transform(labels)
represented_classes = set(np.unique(landuse_classes).tolist())
all_classes = set(label_encoder.classes_.tolist())
missing_classes = all_classes - represented_classes
return list(represented_classes)
def get_missing_landuse_classes_from_onehot_labels(y, label_encoder):
'''
Input: y (one-hot labels, 2D array), label_encoder
Output: list of landuse class numbers that do not appear in y
'''
all_classes = set(label_encoder.classes_.tolist())
represented_classes = get_represented_landuse_classes_from_onehot_labels(y)
represented_classes = set(represented_classes)
missing_classes = all_classes - represented_classes
return list(missing_classes)
def get_scene_dirs_for_continent(continent, config, mode='segmentation'):
'''
Input: continent (e.g. North_America), config
Output: list of scene directories for that continent
'''
# get directories for this continent
dataset_dir = config['subpatches_dataset_dir'] if mode == 'subpatches' \
else config['segmentation_dataset_dir']
continent_season_subdirs = [entry.path \
for entry in os.scandir(dataset_dir) \
if entry.is_dir() and \
continent in entry.name]
# traverse 1 level down to get scene directories
all_scene_dirs = []
for continent_season_subdir in continent_season_subdirs:
scene_dirs = [entry.path for entry in os.scandir(continent_season_subdir) \
if entry.is_dir() and 'scene' in entry.name]
all_scene_dirs += scene_dirs
return all_scene_dirs
def get_scene_dirs_for_season(season, config, mode='segmentation'):
'''
Input: season (e.g. fall), config
Output: list of scene directories for that continent
'''
# get directories for this season
dataset_dir = config['subpatches_dataset_dir'] if mode == 'subpatches' \
else config['segmentation_dataset_dir']
continent_season_subdirs = [entry.path \
for entry in os.scandir(dataset_dir) \
if entry.is_dir() and \
season in entry.name]
# traverse 1 level down to get scene directories
all_scene_dirs = []
for continent_season_subdir in continent_season_subdirs:
scene_dirs = [entry.path for entry in os.scandir(continent_season_subdir) \
if entry.is_dir() and 'scene' in entry.name]
all_scene_dirs += scene_dirs
return all_scene_dirs
def get_scene_dirs_for_continent_season(continent, season, config, mode='segmentation'):
'''
Input: continent (e.g. Africa), season (e.g. fall), config
Output: list of scene directories for that continent-season
'''
# get directories for this season
dataset_dir = config['subpatches_dataset_dir'] if mode == 'subpatches' \
else config['segmentation_dataset_dir']
continent_season_subdirs = [entry.path \
for entry in os.scandir(dataset_dir) \
if entry.is_dir() and \
continent in entry.name and \
season in entry.name]
# traverse 1 level down to get scene directories
all_scene_dirs = []
for continent_season_subdir in continent_season_subdirs:
scene_dirs = [entry.path for entry in os.scandir(continent_season_subdir) \
if entry.is_dir() and 'scene' in entry.name]
all_scene_dirs += scene_dirs
return all_scene_dirs
def get_segmentation_patch_paths_for_scene_dir(scene_dir):
'''
Input: single scene_dir
Output: list of segmentation data patch paths
'''
all_subpatch_paths = []
patch_dirs = [entry.path for entry in os.scandir(scene_dir) \
if entry.is_dir() and 'patch' in entry.name]
return patch_dirs
def get_segmentation_patch_paths_for_scene_dirs(scene_dirs):
'''
Input: scene_dirs (list)
Output: list of subpatch .npy paths
'''
all_patch_paths = []
# traverse scenes
for scene_dir in scene_dirs:
all_patch_paths += get_segmentation_patch_paths_for_scene_dir(scene_dir)
return all_patch_paths
def get_subpatch_paths_for_scene_dir(scene_dir):
'''
Input: single scene_dir
Output: list of subpatch .npy paths
'''
all_subpatch_paths = []
patch_dirs = [entry.path for entry in os.scandir(scene_dir) \
if entry.is_dir() and 'patch' in entry.name]
# traverse patches
for patch_dir in patch_dirs:
# get subpatch files
subpatch_npy_paths = [entry.path for entry in os.scandir(patch_dir) \
if entry.is_file() and 'subpatch' in entry.name and entry.name.endswith('.npy')]
# TODO: numeric sort by subpatch_id?
all_subpatch_paths += subpatch_npy_paths
return all_subpatch_paths
def get_subpatch_paths_for_scene_dirs(scene_dirs):
'''
Input: scene_dirs (list)
Output: list of subpatch .npy paths
'''
all_subpatch_paths = []
# traverse scenes
for scene_dir in scene_dirs:
all_subpatch_paths += get_subpatch_paths_for_scene_dir(scene_dir)
return all_subpatch_paths
def patch_path_to_geo_info(patch_path):
''' get (continent, season, scene_id, patch_id) from patch_path '''
info = patch_path.split('sen12ms_')[1]
continent_season, scene, patch = info.split('/')[1:4]
continent, season = continent_season.split('-')
scene_id = int(scene.split('scene_')[1])
patch_id = int(patch.split('patch_')[1])
return (continent, season, scene_id, patch_id)
def geo_info_to_patch_path(dataset_dir, continent, season, scene, patch):
''' get patch path from geo info '''
patch_path = f'{dataset_dir}/{continent}-{season}/scene_{scene}/patch_{patch}'
return patch_path
def get_patch_paths_in_cluster(image_cluster_df, cluster_index, config, continent=None):
''' get list of patch paths in cluster (and continent) '''
cluster_col = config['kmeans_params']['df_cluster_col']
if continent == None:
in_continent_cluster = image_cluster_df.loc[image_cluster_df[cluster_col] == cluster_index]
else:
in_continent_cluster = image_cluster_df.loc[(image_cluster_df['continent'] == continent) & \
(image_cluster_df[cluster_col] == cluster_index)]
patch_paths = []
for row in in_continent_cluster.itertuples(index=False, name='Patch'):
continent, season, scene, patch = getattr(row, 'continent'), getattr(row, 'season'), \
getattr(row, 'scene_id'), getattr(row, 'patch_id')
patch_path = geo_info_to_patch_path(config['segmentation_dataset_dir'], \
continent, season, scene, patch)
patch_paths.append(patch_path)
return patch_paths
def get_all_patch_paths_from_df(image_cluster_df, config):
''' get all patch_paths from image_cluster_df '''
patch_paths = []
for row in image_cluster_df.itertuples(index=False, name='Patch'):
continent, season, scene, patch = getattr(row, 'continent'), getattr(row, 'season'), \
getattr(row, 'scene_id'), getattr(row, 'patch_id')
patch_path = geo_info_to_patch_path(config['segmentation_dataset_dir'], \
continent, season, scene, patch)
patch_paths.append(patch_path)
return patch_paths