-
Notifications
You must be signed in to change notification settings - Fork 3
/
imputer.py
121 lines (111 loc) · 4.36 KB
/
imputer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""Impute missing values with k nearest classifier."""
import sys
import numpy as np
import pandas as pd
from sklearn import neighbors
class Imputer:
"""Imputer class."""
def _fit(self, X, column, k=10, is_categorical=False):
"""Fit a knn classifier for missing column.
- Args:
X(numpy.ndarray): input data
column(int): column id to be imputed
k(int): number of nearest neighbors, default 10
is_categorical(boolean): is continuous or categorical feature
- Returns:
clf: trained k nearest neighbour classifier
"""
clf = None
if not is_categorical:
clf = neighbors.KNeighborsRegressor(n_neighbors=k)
else:
clf = neighbors.KNeighborsClassifier(n_neighbors=k)
# use column not null to train the kNN classifier
missing_idxes = np.where(pd.isnull(X[:, column]))[0]
if len(missing_idxes) == 0:
return None
X_copy = np.delete(X, missing_idxes, 0)
X_train = np.delete(X_copy, column, 1)
# if other columns still have missing values fill with mean
col_mean = None
if not is_categorical:
col_mean = np.nanmean(X, 0)
else:
col_mean = np.nanmedian(X, 0)
for col_id in range(0, len(col_mean) - 1):
col_missing_idxes = np.where(np.isnan(X_train[:, col_id]))[0]
if len(col_missing_idxes) == 0:
continue
else:
X_train[col_missing_idxes, col_id] = col_mean[col_id]
y_train = X_copy[:, column]
# fit classifier
clf.fit(X_train, y_train)
return clf
def _transform(self, X, column, clf, is_categorical):
"""Impute missing values.
- Args:
X(numpy.ndarray): input numpy ndarray
column(int): index of column to be imputed
clf: pretrained classifier
is_categorical(boolean): is continuous or categorical feature
- Returns:
X(pandas.dataframe): imputed dataframe
"""
missing_idxes = np.where(np.isnan(X[:, column]))[0]
X_test = X[missing_idxes, :]
X_test = np.delete(X_test, column, 1)
# if other columns still have missing values fill with mean
col_mean = None
if not is_categorical:
col_mean = np.nanmean(X, 0)
else:
col_mean = np.nanmedian(X, 0)
# fill missing values in each column with current col_mean
for col_id in range(0, len(col_mean) - 1):
col_missing_idxes = np.where(np.isnan(X_test[:, col_id]))[0]
# if no missing values for current column
if len(col_missing_idxes) == 0:
continue
else:
X_test[col_missing_idxes, col_id] = col_mean[col_id]
# predict missing values
y_test = clf.predict(X_test)
X[missing_idxes, column] = y_test
return X
def knn(self, X, column, k=10, is_categorical=False):
"""Impute missing value with knn.
- Args:
X(pandas.dataframe): dataframe
column(str): column name to be imputed
k(int): number of nearest neighbors, default 10
is_categorical(boolean): is continuous or categorical feature
- Returns:
X_imputed(pandas.dataframe): imputed pandas dataframe
"""
X, column = self._check_X_y(X, column)
clf = self._fit(X, column, k, is_categorical)
if clf is None:
return X
else:
X_imputed = self._transform(X, column, clf, is_categorical)
return X_imputed
def _check_X_y(self, X, column):
"""Check input, if pandas.dataframe, transform to numpy array.
- Args:
X(ndarray/pandas.dataframe): input instances
column(str/int): column index or column name
- Returns:
X(ndarray): input instances
"""
column_idx = None
if isinstance(X, pd.core.frame.DataFrame):
if isinstance(column, str):
# get index of current column
column_idx = X.columns.get_loc(column)
else:
column_idx = column
X = X.as_matrix()
else:
column_idx = column
return X, column_idx