-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.py
123 lines (92 loc) · 3.82 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import argparse
import pandas as pd
from lem2 import DecisionTable
def negate(function):
def new_function(*args, **kwargs):
return ~function(*args, **kwargs)
return new_function
def positive_indices(series):
return [i for i, x in enumerate(series) if x]
def lower_bound(df, decision, attributes):
"""positive decision and there are no rows
with the same information and negative decision
"""
positive = decision(df)
negative = ~positive
def same_attributes_neg_decision(pos_row) -> pd.Series:
"""Gets a series of answers which rows are negative
and all their attributes are the same as in the input
"""
res = negative
for attr in attributes:
p = df[attr] == pos_row[attr]
res = res & p
return res
def any_attrs_neg_dec_exist(i: int) -> bool:
"""Returns true if any row of index other than i-th
has the same attributes as the i-th row
"""
pos_row = df.iloc[i]
s = pd.Series(x != i for x in range(len(df)))
r = same_attributes_neg_decision(pos_row)
return sum(s & r) == 0
any_attrs = pd.Series(any_attrs_neg_dec_exist(i)
for i in range(len(positive)))
return positive & any_attrs
def neg_upper_bound(df, decision, attributes):
neg_dec_any_attrs_pos_dec = lower_bound(df, negate(decision), attributes)
return neg_dec_any_attrs_pos_dec
def process_subset(df, keys, subset_ids) -> set:
dt = DecisionTable(keys)
records = df[keys].to_dict(orient='records')
for obj in records:
dt.insertObject(obj)
print("Lem2 output:")
rules = dt.getRulesForObjects(subset_ids, verbose=True)
return DecisionTable.extractUsedAttributes(rules, verbose=True)
def process_df(df, decision):
"""Processes the data and extracts attributes
considering the subset of ids
Args:
df_path (str): path to the dataframe
decision: decision function
"""
keys = df.keys().tolist()
attributes = keys[0:-1]
lb = lower_bound(df, decision, attributes)
nub = neg_upper_bound(df, decision, attributes)
lb_subset_ids = positive_indices(lb)
nub_subset_ids = positive_indices(nub)
print('\nProcessing the lower bound subset')
print('(which attributes come with good results)')
lset = process_subset(df, attributes, lb_subset_ids)
print('\nProcessing the upper bound complement subset')
print('(which attributes come with bad results)')
uset = process_subset(df, attributes, nub_subset_ids)
lst = [lset, uset]
union = set().union(*lst)
return union
def parse_args():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input', type=str, default='dataset/student-mat.csv', help='data table input')
parser.add_argument('--column-idx', type=int, default=-1, help='decision column index')
parser.add_argument('--decision-lambda', default="lambda df: df['G3'] > 10", help='decision lambda(exprert opinion)')
parser.add_argument('--bool', action='store_true', default=False, help='whether decision column contains bool value')
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
df_path = args.input
df = pd.read_csv(df_path)
keys = df.keys().tolist()
print(df)
column_idx = keys[args.column_idx]
bool_lambda = lambda df: df[column_idx].str.lower().isin(['true', 'yes', 'tak'])
decision = bool_lambda if args.bool else eval(args.decision_lambda)
# Limit the number of rows for processing
df = df.iloc[:200]
print(f'Number of entries: {len(df)}')
s = sum(decision(df))
print(f'Number of entries for which the positive decision has been made: {s}')
res = process_df(df, decision)
print('\nUnion of attributes:')
print(res)