-
Notifications
You must be signed in to change notification settings - Fork 1
/
estimator.py
347 lines (291 loc) · 11.3 KB
/
estimator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
'''
This module implments the label estimator techniques (EM and majority voting)
and the observation generator to simulate data from crowdsourcing tasks for testing.
'''
import numpy as np
import pandas as pd
import math
import itertools
import time
class ObservationGenerator(object):
''' Generate observation data of objects-workers dataframe. Based on the framework by Nguyen et al. 2013.
Attributes
obs: Generated observation dataframe
true_labels: True labels
workers_types: Labeled workers by types: Expert, normal, random spammer, sloppy, and uniform spammer
r: Reliability rating of each worker type
dist: Worker type distribution
References
Q. V. H. Nguyen et al. (2013) An Evaluation of Aggregation Techniques in Crowdsourcing. In Proc. of WISE.
'''
def __init__(self, objects, workers, labels, dist):
''' Generate observations according to specific parameters
Args
objects: Set of indices representing objects
workers: Set of indices representing workers
labels: Set of indices representing labels
dist: Worker type distribution, i.e. [p_expert, p_normal, p_random, p_sloppy, p_uniform]
'''
self.obs = pd.DataFrame(index=objects, columns=workers)
self.true_labels = np.random.randint(max(labels)+1, size=len(objects))
worker_types = ['expert', 'normal', 'random', 'sloppy', 'uniform']
self.r = {'expert': (0.9, 1), 'normal': (0.6, 0.9), 'random': (0.4, 0.6), 'sloppy': (0.1, 0.4), 'uniform': None}
if dist == None:
self.dist = [0.2, 0.2, 0.2, 0.2, 0.2]
if int(sum(dist)) > 1:
raise Exception('Probabilities do not sum up to 1')
self.dist = dist
self.workers_types = np.random.choice(worker_types, len(workers), p=self.dist)
for j in self.obs.columns:
w = self.workers_types[j]
if w != 'uniform':
num_obj = len(objects) * np.random.uniform(self.r[w][0], self.r[w][1])
correct_answer_indexes = np.random.permutation(np.arange(len(objects)))[:int(num_obj)]
for i in self.obs.index:
if i in correct_answer_indexes:
self.obs.loc[i, j] = self.true_labels[i]
else:
l = list(labels.copy())
l.remove(self.true_labels[i])
self.obs.loc[i, j] = np.random.choice(l, 1)[0]
else:
l = np.random.choice(labels, 1)[0]
self.obs[j] = self.obs[j].fillna(value=l)
class LabelEstimator(object):
''' Estimate true labels from a set of observations
Attributes
T: Estimated labels dataframe (index=object ID, column=label ID)
Atrributes specific to EM algorithm
em_p: Estimated class priors (columns=labels)
em_pi: Estimated error rates (MultiIndex (k=worker, j=correct label, l=assigned label))
em_C: Estimated workers' cost (columns=workers)
References
A. P. Dawid and A. M. Skene (1979) Maximum Likelihood Estimation of Observer Error-Rates Using the EM Algorithm. Journal of the Royal Statistical Society. Series C (Applied Statistis), 28:1, pp. 20-28
P. G. Ipeirotis et al. (2010) Quality Management on Amazon Mechanical Turk. In. Proc. of HCOMP.
'''
def __init__(self, observations, objects, workers, labels, true_estimates=None):
''' Instantiate the estimator with specific parameters
Args
observations: Observations where each tuple comprising (object index, worker index, label index)
true_estimates: True correct labels
objects: Set of indices representing objects
workers: Set of indices representing workers
labels: Set of indices representing labels
'''
self.observations = observations
self.objects = objects
self.workers = workers
self.labels = labels
if true_estimates is not None:
self.true_estimates = true_estimates
else:
self.true_estimates = None
# Variables for storing clas priors (p), error rates (pi), and estimated labels (T) at the end of EM steps
self.T = None
self.em_p = None
self.em_pi = None
self.em_C = None
# Generate n^k_il: k = workers, i = objects, l = labels
self.n = self._workers_objects_labels()
def _workers_objects_labels(self):
''' Generate n^k_il assuming that each worker works with an object only once
Returns
Workers (k) ' assigned labels (l) for specific objects (i)
'''
tuples = []
for i in self.objects:
for k in self.workers:
tuples.append((k, i, self.observations.loc[i, k]))
n = pd.Series(index=pd.MultiIndex.from_tuples(tuples, names=['k', 'i', 'l']))
n = n.fillna(1)
return n
def _init_estimates(self):
''' Initialize T_ij = \sum_k{n^k_il}/sum_k{sum_k{n^k_il}} (eq. 3.1 in Dawid and Skene 1979)
This is the same as majority voting estimates
Returns
Estimated correct labels
'''
T_ij = pd.DataFrame(index=self.objects, columns=self.labels)
for i in T_ij.index:
for j in T_ij.columns:
T_ij.loc[i, j] = round(float(len(self.observations.loc[i][self.observations.loc[i]==j]))/len(self.observations.columns), 4)
return T_ij
def _class_priors(self, T_ij):
''' Compute marginal probabilities p_j = \sum_i{T_ij}/|I| (eq. 2.4 in Dawid and Skene 1979)
Args
T_ij: Estimated correct labels
Returns
Estimated class priors
'''
p_j = []
for j in T_ij.columns:
p_j.append(float(T_ij[j].sum())/len(T_ij.index))
return p_j
def _error_rates(self, T_ij):
''' Compute error-rates pi^k_jl = \sum_j{T_ij*n^k_il}/sum_l{sum_i{T_ij*n^k_il}} (eq. 2.3 in Dawid and Skene 1979)
Args
T_ij: Estimates
Returns
Estimated error-rates
'''
tuples = [x for x in itertools.product(*[self.workers, self.labels, self.labels])]
pi = pd.Series(index=pd.MultiIndex.from_tuples(tuples, names=['k', 'j', 'l']))
pi_nom = pd.Series(index=pd.MultiIndex.from_tuples(tuples, names=['k', 'j', 'l']))
tuples = [x for x in itertools.product(*[self.workers, self.labels])]
pi_denom = pd.Series(index=pd.MultiIndex.from_tuples(tuples, names=['k', 'j']))
for k in set(self.n.index.get_level_values('k')):
for j in T_ij.columns:
denom = 0
for l in set(self.n.index.get_level_values('l')):
nom = 0
for i in set(self.n.index.get_level_values('i')):
if (k, i, l) in self.n.index:
nom += (T_ij.loc[i, j] * self.n.loc[k, i, l])
pi_nom.loc[k, j, l] = nom
denom += nom
pi_denom.loc[k, j] = denom
for k, j, l in pi.index:
pi.loc[k, j, l] = pi_nom.loc[k, j, l]/pi_denom.loc[k, j]
return pi
def _em_estimator(self, T_hat=None, iteration=None, prev_loglik=None, threshold=0.001, max_iteration=50, verbose=False):
''' Estimate correct labels using EM algorithm
Args
T_hat: Starting estimates
iteration: Current iteration
prev_loglik: log-likelihood of the previous iteration
threshold: log-likelihood threshold as a termination criteria
max_iteration: Maximum number of iterations
Returns
Estimated correct labels
'''
t0 = time.time()
# Step 1: Initialize T_ij = \sum_k{n^k_il}/sum_k{sum_k{n^k_il}}
if T_hat is None:
T_hat = self._init_estimates()
if self.true_estimates is not None:
for i in self.true_estimates.index:
T_hat.loc[i] = self.true_estimates.loc[i]
if iteration is None:
iteration = 0
else:
iteration += 1
# Step 2: Compute marginal probability p_j and error-rates pi^k_jl
p = self._class_priors(T_hat)
pi = self._error_rates(T_hat)
# Step 3: Re-compute T_ij (eq. 2.5 in Dawid and Skenen 1979)
# p(T_ij=1 | data) = (p_j * \product_k{\product_l{pi^k_jl ** n^K_il}}) / (\sum_q{p_q * \product_k{\product_l{pi^k_ql ** n^K_il}}})
T = pd.DataFrame(index=self.objects, columns=self.labels)
tuples = [x for x in itertools.product(*[T.index, T.columns])]
T_nom = pd.Series(index=pd.MultiIndex.from_tuples(tuples, names=['i', 'j']))
T_denom = []
# Compute log-likelihood for full data (eq 2.7 in Dawid and Skene 1979)
x1 = 0
x2 = 0
loglik = 0
for i in T.index:
denom = 0
for j in T.columns:
nom = 1
for k in set(pi.index.get_level_values('k')):
for l in set(pi.index.get_level_values('l')):
if (k, i, l) in self.n.index:
nom *= math.pow(pi.loc[k, j, l], self.n.loc[k, i, l])
if pi.loc[k, j, l] > 0:
x1 += self.n.loc[k, i, l] * math.log(pi.loc[k, j, l])
nom *= p[j]
x2 += x1 * p[j]
T_nom.loc[i, j] = nom
denom += nom
loglik += x2
T_denom.append(denom)
for i, j in T_nom.index:
l = T_nom.loc[i, j]/T_denom[i]
# T.loc[i, j] = round(l, 4)
T.loc[i, j] = l
self.T = T
self.em_p = p
self.em_pi = pi
if iteration == 0:
return self._em_estimator(T, iteration, loglik, threshold, max_iteration, verbose)
elif iteration >= max_iteration:
return T
t1 = time.time()
# Step 4: If converge, terminate; otherwise repeate 2 and 3
diff = round(math.fabs(loglik-prev_loglik)/math.fabs(loglik), 4)
if diff > threshold:
if verbose:
print 'iteration %s:/%s loglik=%.4f, prev_loglik=%.4f, diff_pct=%.4f, time=%.4f sec' % (iteration, max_iteration, loglik, prev_loglik, diff, (t1-t0))
return self._em_estimator(T, iteration, loglik, threshold, max_iteration, verbose)
if verbose:
print 'iteration %s/%s: loglik=%.4f, prev_loglik=%.4f, diff_pct=%.4f, time=%.4f sec' % (iteration, max_iteration, loglik, prev_loglik, diff, (t1-t0))
return T
def _mv_estimator(self):
''' Estimate correct labels using majority voting T_ij = \sum_k{n^k_il}/sum_k{sum_k{n^k_il}}
Returns
Estimated correct labels
'''
T = pd.DataFrame(index=self.objects, columns=self.labels)
for i in T.index:
for j in T.columns:
T.loc[i, j] = round(float(len(self.observations.loc[i][self.observations.loc[i]==j]))/len(self.observations.columns), 4)
if self.true_estimates is not None:
for i in self.true_estimates.index:
T.loc[i] = self.true_estimates.loc[i]
self.T = T
return T
def _cost(self, i, j):
''' Naive cost function
if i == j, cost = 0; else cost = 1
'''
if i == j:
return 0
else:
return 1
def _workers_cost(self):
''' Estimate the expected cost of each worker self.C (Ipeirotis et al. 2010)
Perfect workers will have zero cost while random workers/spammers will have high cost
Ipeirotis et al. consider workers with cost < 0.5 to be a good quality worker
'''
wcost = []
# Estimate workers' probability of assigning labels using eq. 2
X = pd.DataFrame(index=self.workers, columns=self.labels)
X = X.fillna(0)
for k in X.index:
for l in X.columns:
for j in self.labels:
X.loc[k, l] += self.em_pi.loc[k, j, l] * self.em_p[j]
# For each worker k and label j, estimate the soft label vector (eq. 1) and workers' cost (eq. 3)
for k in self.workers:
c = 0
for l in self.labels:
soft = []
for j in self.labels:
soft.append((self.em_pi.loc[k, j, l] * self.em_p[j])/X.loc[k, l])
c_soft = 0
for i in self.labels:
for j in self.labels:
if not math.isnan(soft[i]) and not math.isnan(soft[j]):
c_soft += soft[i] * soft[j] * self._cost(i, j)
c += c_soft * X.loc[k, l]
wcost.append('%.4f' % c)
self.em_C = wcost
def estimate(self, method, max_iteration=None, verbose=False):
''' Estimate correct labels of objects using a specific algorithm
Args:
method: em for EM algorithm, mv for majority voting
max_iteration: Maximum number of iterations for EM algorithm
Returns
Estimated correct labels
'''
if method=='em':
if max_iteration is not None:
estimators = self._em_estimator(None, max_iteration=max_iteration, verbose=verbose)
else:
estimators = self._em_estimator(None, verbose=verbose)
# estimate the worker cost
self._workers_cost()
return estimators
elif method=='mv':
return self._mv_estimator()
else:
raise Exception('Error: Unrecognized method "'+method+'"')