-
Notifications
You must be signed in to change notification settings - Fork 1
/
bayesian_recursive.py
341 lines (281 loc) · 16.2 KB
/
bayesian_recursive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import logging
import numpy as np
import pickle
import os
from typing import List, Dict
from sklearn.mixture import GaussianMixture
from configuration import Config
from benchmark_models.benchmark import main_deepwater
from benchmark_models.watnet.watnet_infer import watnet_infer_main
from tools.spectral_index import get_broadband_index, get_scaled_index
from tools.operations import get_index_pixels_of_interest
from sklearn.linear_model import LogisticRegression
class RBC:
""" Class containing the core of the recursive Bayesian classification algorithm. RBC stands for
Recursive Bayesian Classifier.
Class Attributes
----------------
transition_matrix : np.ndarray
transition matrix, hardcoded by the user in the configuration file
gmm_densities: List[GaussianMixture]
list containing the trained Gaussian Mixture Model densities
classes : List[str]
list containing the classes to be evaluated for the current scenario
classes_prior : list
list containing the prior probability for each evaluated class
model : str
model string identifier
Class Methods
-------------
set_lr_trained_model(cls, trained_model)
get_rbc_objects(self, gmm_densities, trained_lr_model)
update_labels(self, image_all_bands, time_index)
calculate_prediction(self, image_all_bands, time_index)
calculate_posterior(self, bands, y_pred, model, pixel_position)
"""
def __init__(self, transition_matrix: np.ndarray, gmm_densities: List[GaussianMixture], classes: list,
classes_prior: list, model: str):
""" Initializes instance of RBC object with the corresponding class attributes.
"""
self.transition_matrix = transition_matrix
self.classes = classes
self.classes_prior = classes_prior
self.num_classes = len(classes)
self.model = model
self.densities = gmm_densities
#print(f"model name {self.model} and transition probability matrix is {self.transition_matrix}")
@classmethod
def set_lr_trained_model(cls, trained_model: LogisticRegression):
""" Sets the corresponding value of the trained Logistic Regression (LR) model.
"""
cls.trained_model = trained_model
def update_labels(self, image_all_bands: np.ndarray, time_index: int = 0):
""" Returns the prior/likelihood and posterior probabilities for each evaluated model.
Parameters
----------
image_all_bands : np.ndarray
pixel values for all the bands of the target image
This parameter must have size (dim_x*dim_y, n_bands):
- dim_x = Config.image_dimensions[Config.scenario]['dim_x']
- dim_y = Config.image_dimensions[Config.scenario]['dim_y']
- n_bands = len(Config.bands_to_read))
time_index : int
bayesian recursion time index
Returns
-------
y_pred : Dict[str, np.ndarray]
dictionary containing the prior probabilities or likelihood for each model
predicted_image : Dict[str, np.ndarray]
dictionary containing the posterior probabilities for each model
"""
# Get the index of the pixels that define the scene selected in the configuration file
self.index_pixels_of_interest = get_index_pixels_of_interest(image_all_bands=image_all_bands,
scene_id=Config.scene_id)
# Calculate the total number of pixels in the image (whole image - training area)
self.total_num_pixels = Config.image_dimensions[Config.scenario]['dim_x'] * \
Config.image_dimensions[Config.scenario]['dim_y']
# Calculate the prior probability (time_index = 0)
# or the likelihood or base model posterior (time_index > 0)
# The manner to calculate this depends on the model under evaluation (if-clause)
y_pred = self.calculate_prediction(image_all_bands=image_all_bands)
prediction_float = y_pred # to return for histogram analysis
# At time instant 0, the prior probability is equal to:
# - the model prediction, in the case of having a pretrained model
# - the softmax probability, otherwise
# *y_pred* corresponds to the likelihood or the base model posterior
if time_index == 0:
self.prior_probability = y_pred
# Calculate the posterior probability
# Initialize array where the posterior probabilities will be stored
# Later, this array is updated analysing only the pixels in the subscene
self.posterior_probability = np.zeros(shape=[self.total_num_pixels, self.num_classes])
# TODO: This line was added to avoid a dimensional error, but it should be checked whether it could be removed
if np.ndim(y_pred) == 3:
y_pred = y_pred.reshape(self.total_num_pixels, self.num_classes)
# Image with the posterior probabilities for each pixel is calculated
# Only the pixels from the chosen scene are considered
# The calculate_posterior function is applied to all the rows
# Each row corresponds to all the bands values for a specific pixel
"""
del_l = np.apply_along_axis(self.calculate_posterior, 1, image_all_bands[self.index_pixels_of_interest,], y_pred, self.model)
del_l = del_l.reshape((del_l.shape[0], self.num_classes))
self.posterior_probability[self.index_pixels_of_interest] = del_l # we just save the subscene pixels
"""
del_l = np.ndarray(shape=[self.index_pixels_of_interest.shape[0], self.num_classes])
for pixel_i in range(self.index_pixels_of_interest.shape[0]):
del_l[pixel_i, :] = self.calculate_posterior(bands=image_all_bands[self.index_pixels_of_interest[pixel_i],],
y_pred=y_pred,
pixel_position=self.index_pixels_of_interest[pixel_i])
self.posterior_probability[self.index_pixels_of_interest] = del_l # we just save the subscene pixels
# Calculate the class predictions
predict_image = self.posterior_probability.argmax(axis=1)
y_pred = y_pred.argmax(axis=1) # position with maximum values
return y_pred, predict_image, prediction_float
def calculate_prediction(self, image_all_bands: np.ndarray):
""" Returns the prior/likelihood and posterior probabilities for each evaluated model.
Parameters
----------
image_all_bands : np.ndarray
pixel values for all the bands of the target image
Returns
-------
y_pred : np.ndarray
prior probabilities or likelihood for the currently evaluated model
"""
# Scaled Index Model
if self.model == "Scaled Index":
# Calculate spectral index, which has range [-1, 1]
spectral_index = get_broadband_index(data=image_all_bands, bands=Config.bands_spectral_index[Config.scenario])
# Scale the spectral index to range [0, 1] so that it can be seen as a probability
scaled_index = get_scaled_index(spectral_index=spectral_index, num_classes=self.num_classes)
y_pred = scaled_index
# DeepWaterNet Model, used in this project for benchmarking
elif self.model == "DeepWaterMap": # first version
# Select only the bands used in the case of this algorithm
image_deepwaternet_bands = image_all_bands[:, Config.bands_deepwaternet].reshape(
Config.image_dimensions[Config.scenario]['dim_x'],
Config.image_dimensions[Config.scenario]['dim_y'], len(Config.bands_watnet))
# Values are clipped between 0 and 1
image_deepwaternet_bands = np.float32(
np.clip(image_deepwaternet_bands * Config.scaling_factor_watnet, a_min=0, a_max=1))
# The model published by the authors is called within the main_deepwater function
water_map = main_deepwater(checkpoint_path=Config.path_checkpoints_deepwatermap, image=image_deepwaternet_bands)
water_map = water_map.reshape(-1, 1)
y_pred = np.concatenate((water_map, 1-water_map), axis=1)
for idx in range(y_pred.shape[0]):
y = y_pred[idx] + Config.norm_constant
y = y/sum(y)
y_pred[idx] = y
# WatNet Model, used in this project for benchmarking
# The WatNet model is an improved version of the DeepWaterNet model provided by their authors
elif self.model == "WatNet":
# Select only the bands used in the case of this algorithm
image_watnet_bands = image_all_bands[:, Config.bands_watnet].reshape(
Config.image_dimensions[Config.scenario]['dim_x'],
Config.image_dimensions[Config.scenario]['dim_y'], len(Config.bands_watnet))
# Values are clipped between 0 and 1
image_watnet_bands = np.float32(
np.clip(image_watnet_bands * Config.scaling_factor_watnet, a_min=0, a_max=1))
# The model published by the authors is called within the watnet_infer function
water_map = watnet_infer_main(rsimg=image_watnet_bands,
path_model=Config.path_watnet_pretrained_model).reshape(-1, 1)
y_pred = np.concatenate((water_map, 1-water_map), axis=1)
y_pred_copy = y_pred.copy().astype(np.float)
for idx in range(y_pred.shape[0]):
y = y_pred[idx] + Config.norm_constant
y = y/sum(y)
y_pred_copy[idx] = y
y_pred = y_pred_copy
# Logistic Regression Model
elif self.model == "Logistic Regression":
# The last column is not considered because it contains the spectral index values, which
# are not required for the probability prediction
y_pred = self.trained_model.predict_proba(image_all_bands[:, :-1])
# GMMs Model
elif self.model == "GMM":
y_pred = np.zeros(shape=[self.total_num_pixels, self.num_classes])
# Calculate the likelihood for each Gaussian Mixture
# the score_samples function takes into account all the rows
for label_index, label in enumerate(self.classes):
# Read *image_all_bands[:, :-1]*, not considering the last column because it correspond to the
# spectral index values
y_pred[:, label_index] = np.exp(self.densities[label_index].score_samples(image_all_bands[:, :-1]))
sum_den = np.sum(y_pred, axis=1)
y_pred = np.divide(y_pred, sum_den.reshape(self.total_num_pixels, 1))
y_pred = y_pred.astype('float32')
return y_pred
def calculate_posterior(self, bands: np.ndarray, y_pred: np.ndarray, pixel_position: int):
""" Returns the posterior probabilities for each evaluated model. This function is called row by
row with the np.apply_along_axis function.
Parameters
----------
bands: np.ndarray
band values for the evaluated pixel
y_pred : np.ndarray
prior probabilities/likelihood for each class
pixel_position: int
position of the pixel being evaluated (as the function is called row by row)
Returns
-------
posterior : np.ndarray
array containing the posterior probability for each class
"""
# Posterior probability is calculated here. eq1.
# self.classes_prior[ct_p] = marginal probability
post_return = np.zeros(shape=[self.num_classes, self.num_classes])
if self.model != "GMM": # TODO: Change this
lik_lb = y_pred[pixel_position]
# for each class posterior probability p(water) and p(zt/non water)
for ct in range(self.num_classes):
# for each class E(summation outer)
for ct_1 in range(self.num_classes):
b = 0
# for each class inner summation
for ct_p in range(self.num_classes):
b += (lik_lb[ct_p] / self.classes_prior[ct_p]) * self.transition_matrix[ct_p, ct_1]
a = self.transition_matrix[ct, ct_1] * self.prior_probability[pixel_position, ct_1]
post_return[ct, ct_1] = a / b
post_return[ct, :] = post_return[ct, :] * (lik_lb[ct] / self.classes_prior[ct])
p = np.sum(post_return, axis=1).reshape(-1, )
self.prior_probability[pixel_position] = (p / np.sum(p)).reshape(self.prior_probability[pixel_position].shape)
else:
lik_lb = y_pred[pixel_position] / np.sum(y_pred[pixel_position])
for ct in range(self.num_classes): # label
for ct_1 in range(self.num_classes):
b = 0
for ct_p in range(self.num_classes):
b += (lik_lb[ct_p]) * self.transition_matrix[ct_p, ct_1]
a = self.transition_matrix[ct, ct_1] * self.prior_probability[pixel_position, ct_1]
post_return[ct, ct_1] = a / b
post_return[ct, :] = post_return[ct, :] * (lik_lb[ct])
p = np.sum(post_return, axis=1)
self.prior_probability[pixel_position] = (p / np.sum(p))
posterior = p / np.sum(p)
return posterior
def get_rbc_objects(gmm_densities: List[GaussianMixture], trained_lr_model: LogisticRegression):
""" Returns the RBC objects containing the models to be evaluated.
Parameters
----------
gmm_densities : List[GaussianMixture]
gaussian mixture densities calculated in the training stage
trained_lr_model : LogisticRegression
logistic regression (LR) model after training in the training stage
Returns
-------
rbc_objects : Dict[str, RBC]
dictionary containing one RBC object per model to evaluate
"""
logging.debug("Creating instance of RBC objects for Gaussian Mixture, Logistic Regression (LR) and Scaled "
"Index models")
# Extract the desired settings from the configuration file
transition_matrix = Config.transition_matrix[Config.scenario]
classes = Config.classes[Config.scenario]
classes_prior = Config.prior_probabilities[Config.scenario]
# Create dictionary object to store the RBC objects
rbc_objects = {}
# RBC object for GMM model
rbc_objects["GMM"] = RBC(transition_matrix=transition_matrix, classes=classes, gmm_densities=gmm_densities,
classes_prior=classes_prior, model="GMM")
# RBC object for Scaled Index model
rbc_objects["Scaled Index"] = RBC(transition_matrix=transition_matrix, classes=classes,
gmm_densities=gmm_densities,
classes_prior=classes_prior, model="Scaled Index")
# RBC object for Logistic Regression model
rbc_objects["Logistic Regression"] = RBC(transition_matrix=transition_matrix, classes=classes,
gmm_densities=gmm_densities, classes_prior=classes_prior,
model="Logistic Regression")
rbc_objects["Logistic Regression"].set_lr_trained_model(trained_model=trained_lr_model)
# Benchmark Deep Learning models for the water mapping experiment
if Config.scenario == "oroville_dam":
# RBC object for DeepWaterMap algorithm
rbc_objects["DeepWaterMap"] = RBC(transition_matrix=transition_matrix, classes=classes,
gmm_densities=gmm_densities, classes_prior=classes_prior,
model="DeepWaterMap")
rbc_objects["DeepWaterMap"].set_lr_trained_model(trained_model=trained_lr_model)
# RBC object for WatNet algorithm
rbc_objects["WatNet"] = RBC(transition_matrix=transition_matrix, classes=classes,
gmm_densities=gmm_densities, classes_prior=classes_prior,
model="WatNet")
rbc_objects["WatNet"].set_lr_trained_model(trained_model=trained_lr_model)
# Returns the dictionary with type Dict[RBC]
return rbc_objects