forked from ajtulloch/NaiveBayesSpamFilter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BayesClassifier.py
244 lines (197 loc) · 9.53 KB
/
BayesClassifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python
# encoding: utf-8
"""
NaiveBayesClassifier.py
Created by Andrew John Tulloch on 2010-05-15.
Copyright (c) 2010 Andrew Tulloch. All rights reserved.
"""
import sys
import os
import csv
from utils import *
import math
import random
#------------------------------------------------------------------------------
class NaiveBayesClassifier():
"""Naive Bayes Classifier class
Implements the methods:
CSV Read - reads a data file
Train - Trains on a set of messages
Feature_class_mean_sd - Calculates mean and sd
for FEATURE when CLASS = SPAM CLASS
Classify - Classifies a message
P_spam_not_spam - Calculates probabilities a message
is spam or not spam
Classification_test - tests if a message is correctly
classified
Stratification_test - Performs 10-fold cross validation"""
def __init__(self, corpus):
self.type = corpus # Type of corpus - body or subject
self.corpus_header, self.corpus_data = self.csv_read(corpus)
self.corpus_data = self.cosine_normalisation()
# Reads the corpus data
def csv_read(self, corpus):
"""Reads a CSV file. Outputs two lists:
corpus_float_data - a list of messages
corpus_header - a list of headers"""
corpus_data = []
corpus_file = self.type + ".csv" # e.g. subject.csv
reader = csv.reader(open(corpus_file))
for row in reader:
corpus_data.append(row)
# Scans through the rows, appending to the file
corpus_header = corpus_data[:1] # Header data "f1, f2..."
corpus_data = corpus_data[1:] # Message data with TF-IDF scores
corpus_float_data = []
for row in corpus_data:
# Converts strings to floats
float_row = [float(i) for i in row[:-1]]
float_row.append(row[-1])
corpus_float_data.append(float_row)
return corpus_header, corpus_float_data
def cosine_normalisation(self):
"""Performs the cosine normalisation of data"""
self.normalised_data = []
for message in self.corpus_data:
normalised_scores = []
tf_idf_scores = message[:-1]
normalisation_factor = math.sqrt(sum([i**2 for i in tf_idf_scores]))
# Calculate \sum_{k} tf-idf(t_k, d_j)^2
if normalisation_factor == 0:
# Prevents dividing by zero
self.normalised_data.append(message)
else:
for score in tf_idf_scores:
normalised_scores.append(score/float(normalisation_factor))
normalised_scores.append(message[-1])
self.normalised_data.append(normalised_scores)
return self.normalised_data
def train(self, training_set):
"""Trains the classifier by calculating the prior normal distribution
parameters for the feature sets and TRUE/FALSE"""
training_messages = [self.corpus_data[i] for i in training_set]
# The set of training messages
self.mean_sd_data = {}
# Empty dictionary to hold mean and sd data
for feature in range(200):
self.mean_sd_data[feature] = {"Not Spam":[0, 0], "Spam":[0, 0]}
for spam_class in ["Not Spam", "Spam"]:
self.mean_sd_data[feature][spam_class] = []
# Initialise the dictionary
for feature in range(200):
for spam_class in ["Not Spam", "Spam"]:
# Fill the dictionary with values calculated from the feature_class_mean_sd method
self.mean_sd_data[feature][spam_class] = self.feature_class_mean_sd(spam_class, feature, training_messages)
# Calculate the a-priori spam and not-spam probabilities
spam_count = 0
for message in training_messages:
if message[-1] == "Spam":
spam_count += 1
self.mean_sd_data["Spam"] = spam_count / float(len(training_set))
self.mean_sd_data["Not Spam"] = 1 - (spam_count / float(len(training_set)))
def feature_class_mean_sd(self, spam_class, feature, training_messages):
"""Calculates the mean and standard deviations for:
FEATURE when CLASS = SPAM CLASS"""
feature_list = []
for message in training_messages:
# Loop through all messages
if spam_class == message[-1]:
# If our message is in the right class
feature_list.append(message[feature])
# Take of the corresponding feature TF-IDF score
# Return the summary statistics of the relevant feature / class
return [mean(feature_list), sd(feature_list)]
def classify(self, message):
"""Classify a message as spam or not spam"""
p_spam = self.bayes_probability(message, "Spam")
# Probability that message is spam
p_not_spam = self.bayes_probability(message, "Not Spam")
# Probability that message is not spam
# print p_spam, p_not_spam
if p_spam > p_not_spam:
return "Spam"
# Message is not spam
else:
return "Not Spam"
# Message is spam
def bayes_probability(self, message, spam_class):
"""Probability that a message is or is not spam"""
a_priori_class_probability = self.mean_sd_data[spam_class]
# Probability that a single message is spam or not spam i.e. P(spam_id)
# print "Commencing Bayes Probability on Message 0"
# print "A priori Class Probability of {0} class is {1}".format(spam_class, a_priori_class_probability)
class_bayes_probability = a_priori_class_probability
body_best_features = [ 6,8,11,34,35,45,48,117,124,134,141,174]
# Feature selection from WEKA
subject_best_features = range(1,200)
if self.type == "body":
"""Converts the features f1, f2, ...fn into Python list indices"""
best_features = map(lambda x :x -1, body_best_features)
else:
best_features = map(lambda x :x - 1, subject_best_features)
for feature in best_features:
# For all features
message_tf_idf_score = message[feature]
# Message tf_idf value
tf_idf_mean = self.mean_sd_data[feature][spam_class][0]
tf_idf_sd = self.mean_sd_data[feature][spam_class][1]
# Get the parameters of the probability distribution governing this class
prob_feature_given_class = norm_dist(message_tf_idf_score, tf_idf_mean, tf_idf_sd)
# Find the probabilty P(tf-idf_feature = score | msg_class = class)
class_bayes_probability = class_bayes_probability * prob_feature_given_class
# Multiply together to obtain total probabilitiy
# as per the Naive Bayes independence assumption
return class_bayes_probability # Our probability that a message is spam or not spam
def classification_test(self, message):
"""Tests if a message is correctly classified"""
if self.classify(message) == message[-1]:
return True
else:
return False
def stratification_test(self):
"""Performs 10-fold stratified cross validation"""
already_tested = []
test_set = []
for i in range(10):
"""Create the set of 10 sixty element random bins"""
sample = random.sample([i for i in range(600) if i not in already_tested], 60)
already_tested.extend(sample)
test_set.append(sample)
results = []
for validation_data in test_set:
"""Create the training set (540 elements) and the validation data (60 elements)"""
training_sets = [training_set for training_set in test_set if training_set is not validation_data]
training_data = []
for training_set in training_sets:
training_data.extend(training_set)
self.train(training_data)
# Train the probabilities of the Bayes Filter
count = 0
for index in validation_data:
"""Calculate the percentage of successful classifications"""
if self.classification_test(self.corpus_data[index]):
count += 1
results.append(float(count)/len(validation_data))
return results
#------------------------------------------------------------------------------
def print_results(results):
"""Formats results and prints them, along with summary statistic"""
for result, index in zip(results, range(len(results))):
print "Stratification Set {0} \t {1:.1f}% classified correctly.".format(index+1, result*100)
print "##"*30
print "\n\tOverall Accuracy is {0:.1f}%".format(mean(results) * 100)
if __name__ == '__main__':
import random
random.seed(18)
# Sets the seed, for result reproducibility
test = NaiveBayesClassifier("subject")
print "\tTesting Subject Corpus"
print "##"*30
results = test.stratification_test()
print_results(results)
print
print "\tTesting Body Corpus"
print "##"*30
test = NaiveBayesClassifier("body")
results = test.stratification_test()
print_results(results)