-
-
Notifications
You must be signed in to change notification settings - Fork 45.7k
/
frequent_pattern_growth.py
350 lines (303 loc) · 10.6 KB
/
frequent_pattern_growth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""
The Frequent Pattern Growth algorithm (FP-Growth) is a widely used data mining
technique for discovering frequent itemsets in large transaction databases.
It overcomes some of the limitations of traditional methods such as Apriori by
efficiently constructing the FP-Tree
WIKI: https://athena.ecs.csus.edu/~mei/associationcw/FpGrowth.html
Examples: https://www.javatpoint.com/fp-growth-algorithm-in-data-mining
"""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class TreeNode:
"""
A node in a Frequent Pattern tree.
Args:
name: The name of this node.
num_occur: The number of occurrences of the node.
parent_node: The parent node.
Example:
>>> parent = TreeNode("Parent", 1, None)
>>> child = TreeNode("Child", 2, parent)
>>> child.name
'Child'
>>> child.count
2
"""
name: str
count: int
parent: TreeNode | None = None
children: dict[str, TreeNode] = field(default_factory=dict)
node_link: TreeNode | None = None
def __repr__(self) -> str:
return f"TreeNode({self.name!r}, {self.count!r}, {self.parent!r})"
def inc(self, num_occur: int) -> None:
self.count += num_occur
def disp(self, ind: int = 1) -> None:
print(f"{' ' * ind} {self.name} {self.count}")
for child in self.children.values():
child.disp(ind + 1)
def create_tree(data_set: list, min_sup: int = 1) -> tuple[TreeNode, dict]:
"""
Create Frequent Pattern tree
Args:
data_set: A list of transactions, where each transaction is a list of items.
min_sup: The minimum support threshold.
Items with support less than this will be pruned. Default is 1.
Returns:
The root of the FP-Tree.
header_table: The header table dictionary with item information.
Example:
>>> data_set = [
... ['A', 'B', 'C'],
... ['A', 'C'],
... ['A', 'B', 'E'],
... ['A', 'B', 'C', 'E'],
... ['B', 'E']
... ]
>>> min_sup = 2
>>> fp_tree, header_table = create_tree(data_set, min_sup)
>>> fp_tree
TreeNode('Null Set', 1, None)
>>> len(header_table)
4
>>> header_table["A"]
[[4, None], TreeNode('A', 4, TreeNode('Null Set', 1, None))]
>>> header_table["E"][1] # doctest: +NORMALIZE_WHITESPACE
TreeNode('E', 1, TreeNode('B', 3, TreeNode('A', 4, TreeNode('Null Set', 1, None))))
>>> sorted(header_table)
['A', 'B', 'C', 'E']
>>> fp_tree.name
'Null Set'
>>> sorted(fp_tree.children)
['A', 'B']
>>> fp_tree.children['A'].name
'A'
>>> sorted(fp_tree.children['A'].children)
['B', 'C']
"""
header_table: dict = {}
for trans in data_set:
for item in trans:
header_table[item] = header_table.get(item, [0, None])
header_table[item][0] += 1
for k in list(header_table):
if header_table[k][0] < min_sup:
del header_table[k]
if not (freq_item_set := set(header_table)):
return TreeNode("Null Set", 1, None), {}
for k in header_table:
header_table[k] = [header_table[k], None]
fp_tree = TreeNode("Null Set", 1, None) # Parent is None for the root node
for tran_set in data_set:
local_d = {
item: header_table[item][0] for item in tran_set if item in freq_item_set
}
if local_d:
sorted_items = sorted(
local_d.items(), key=lambda item_info: item_info[1], reverse=True
)
ordered_items = [item[0] for item in sorted_items]
update_tree(ordered_items, fp_tree, header_table, 1)
return fp_tree, header_table
def update_tree(items: list, in_tree: TreeNode, header_table: dict, count: int) -> None:
"""
Update the FP-Tree with a transaction.
Args:
items: List of items in the transaction.
in_tree: The current node in the FP-Tree.
header_table: The header table dictionary with item information.
count: The count of the transaction.
Example:
>>> data_set = [
... ['A', 'B', 'C'],
... ['A', 'C'],
... ['A', 'B', 'E'],
... ['A', 'B', 'C', 'E'],
... ['B', 'E']
... ]
>>> min_sup = 2
>>> fp_tree, header_table = create_tree(data_set, min_sup)
>>> fp_tree
TreeNode('Null Set', 1, None)
>>> transaction = ['A', 'B', 'E']
>>> update_tree(transaction, fp_tree, header_table, 1)
>>> fp_tree
TreeNode('Null Set', 1, None)
>>> fp_tree.children['A'].children['B'].children['E'].children
{}
>>> fp_tree.children['A'].children['B'].children['E'].count
2
>>> header_table['E'][1].name
'E'
"""
if items[0] in in_tree.children:
in_tree.children[items[0]].inc(count)
else:
in_tree.children[items[0]] = TreeNode(items[0], count, in_tree)
if header_table[items[0]][1] is None:
header_table[items[0]][1] = in_tree.children[items[0]]
else:
update_header(header_table[items[0]][1], in_tree.children[items[0]])
if len(items) > 1:
update_tree(items[1:], in_tree.children[items[0]], header_table, count)
def update_header(node_to_test: TreeNode, target_node: TreeNode) -> TreeNode:
"""
Update the header table with a node link.
Args:
node_to_test: The node to be updated in the header table.
target_node: The node to link to.
Example:
>>> data_set = [
... ['A', 'B', 'C'],
... ['A', 'C'],
... ['A', 'B', 'E'],
... ['A', 'B', 'C', 'E'],
... ['B', 'E']
... ]
>>> min_sup = 2
>>> fp_tree, header_table = create_tree(data_set, min_sup)
>>> fp_tree
TreeNode('Null Set', 1, None)
>>> node1 = TreeNode("A", 3, None)
>>> node2 = TreeNode("B", 4, None)
>>> node1
TreeNode('A', 3, None)
>>> node1 = update_header(node1, node2)
>>> node1
TreeNode('A', 3, None)
>>> node1.node_link
TreeNode('B', 4, None)
>>> node2.node_link is None
True
"""
while node_to_test.node_link is not None:
node_to_test = node_to_test.node_link
if node_to_test.node_link is None:
node_to_test.node_link = target_node
# Return the updated node
return node_to_test
def ascend_tree(leaf_node: TreeNode, prefix_path: list[str]) -> None:
"""
Ascend the FP-Tree from a leaf node to its root, adding item names to the prefix
path.
Args:
leaf_node: The leaf node to start ascending from.
prefix_path: A list to store the item as they are ascended.
Example:
>>> data_set = [
... ['A', 'B', 'C'],
... ['A', 'C'],
... ['A', 'B', 'E'],
... ['A', 'B', 'C', 'E'],
... ['B', 'E']
... ]
>>> min_sup = 2
>>> fp_tree, header_table = create_tree(data_set, min_sup)
>>> path = []
>>> ascend_tree(fp_tree.children['A'], path)
>>> path # ascending from a leaf node 'A'
['A']
"""
if leaf_node.parent is not None:
prefix_path.append(leaf_node.name)
ascend_tree(leaf_node.parent, prefix_path)
def find_prefix_path(base_pat: frozenset, tree_node: TreeNode | None) -> dict: # noqa: ARG001
"""
Find the conditional pattern base for a given base pattern.
Args:
base_pat: The base pattern for which to find the conditional pattern base.
tree_node: The node in the FP-Tree.
Example:
>>> data_set = [
... ['A', 'B', 'C'],
... ['A', 'C'],
... ['A', 'B', 'E'],
... ['A', 'B', 'C', 'E'],
... ['B', 'E']
... ]
>>> min_sup = 2
>>> fp_tree, header_table = create_tree(data_set, min_sup)
>>> fp_tree
TreeNode('Null Set', 1, None)
>>> len(header_table)
4
>>> base_pattern = frozenset(['A'])
>>> sorted(find_prefix_path(base_pattern, fp_tree.children['A']))
[]
"""
cond_pats: dict = {}
while tree_node is not None:
prefix_path: list = []
ascend_tree(tree_node, prefix_path)
if len(prefix_path) > 1:
cond_pats[frozenset(prefix_path[1:])] = tree_node.count
tree_node = tree_node.node_link
return cond_pats
def mine_tree(
in_tree: TreeNode, # noqa: ARG001
header_table: dict,
min_sup: int,
pre_fix: set,
freq_item_list: list,
) -> None:
"""
Mine the FP-Tree recursively to discover frequent itemsets.
Args:
in_tree: The FP-Tree to mine.
header_table: The header table dictionary with item information.
min_sup: The minimum support threshold.
pre_fix: A set of items as a prefix for the itemsets being mined.
freq_item_list: A list to store the frequent itemsets.
Example:
>>> data_set = [
... ['A', 'B', 'C'],
... ['A', 'C'],
... ['A', 'B', 'E'],
... ['A', 'B', 'C', 'E'],
... ['B', 'E']
... ]
>>> min_sup = 2
>>> fp_tree, header_table = create_tree(data_set, min_sup)
>>> fp_tree
TreeNode('Null Set', 1, None)
>>> frequent_itemsets = []
>>> mine_tree(fp_tree, header_table, min_sup, set([]), frequent_itemsets)
>>> expe_itm = [{'C'}, {'C', 'A'}, {'E'}, {'A', 'E'}, {'E', 'B'}, {'A'}, {'B'}]
>>> all(expected in frequent_itemsets for expected in expe_itm)
True
"""
sorted_items = sorted(header_table.items(), key=lambda item_info: item_info[1][0])
big_l = [item[0] for item in sorted_items]
for base_pat in big_l:
new_freq_set = pre_fix.copy()
new_freq_set.add(base_pat)
freq_item_list.append(new_freq_set)
cond_patt_bases = find_prefix_path(base_pat, header_table[base_pat][1])
my_cond_tree, my_head = create_tree(list(cond_patt_bases), min_sup)
if my_head is not None:
# Pass header_table[base_pat][1] as node_to_test to update_header
header_table[base_pat][1] = update_header(
header_table[base_pat][1], my_cond_tree
)
mine_tree(my_cond_tree, my_head, min_sup, new_freq_set, freq_item_list)
if __name__ == "__main__":
from doctest import testmod
testmod()
data_set: list[frozenset] = [
frozenset(["bread", "milk", "cheese"]),
frozenset(["bread", "milk"]),
frozenset(["bread", "diapers"]),
frozenset(["bread", "milk", "diapers"]),
frozenset(["milk", "diapers"]),
frozenset(["milk", "cheese"]),
frozenset(["diapers", "cheese"]),
frozenset(["bread", "milk", "cheese", "diapers"]),
]
print(f"{len(data_set) = }")
fp_tree, header_table = create_tree(data_set, min_sup=3)
print(f"{fp_tree = }")
print(f"{len(header_table) = }")
freq_items: list = []
mine_tree(fp_tree, header_table, 3, set(), freq_items)
print(f"{freq_items = }")