-
Notifications
You must be signed in to change notification settings - Fork 1
/
proba_helpers.py
69 lines (51 loc) · 2.54 KB
/
proba_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import numpy as np
from collections import Counter
from random_steiner_tree import random_steiner_tree
from graph_helpers import swap_end_points, extract_nodes_from_tuples
def tree_probability_gt(out_degree, p_dict, edges, using_log=True):
if using_log:
log_numer = np.log([p_dict[(u, v)] for u, v in edges]).sum()
log_denum = np.log([out_degree[u] for u, _ in edges]).sum()
return log_numer - log_denum
else:
numer = np.product([p_dict[(u, v)] for u, v in edges])
denum = np.product([out_degree[u] for u, _ in edges])
assert denum > 0, [out_degree[u] for u, _ in edges]
return numer / denum
def cascade_probability_gt(g, p_dict, cascade_edges, g_nx, using_log=True):
"""g and g_nx are not used
"""
if using_log:
# to prevent floating point underflow
return np.log([p_dict[(u, v)] for u, v in cascade_edges]).sum()
else:
return np.product([p_dict[(u, v)] for u, v in cascade_edges])
def ic_cascade_probability_gt(g, p_dict, cascade_edges, nbr_dict, using_log=True):
infected_nodes = extract_nodes_from_tuples(cascade_edges)
if using_log:
# to prevent floating point underflow
log_probas_from_active_edges = np.log([p_dict[(u, v)] for u, v in cascade_edges]).sum()
else:
probas_from_active_edges = np.product([p_dict[(u, v)] for u, v in cascade_edges])
inactive_edges = {(w, u) # here it should (w, u) because of graph transpose
for u, _ in cascade_edges
for w in nbr_dict[u]
if w not in infected_nodes}
inactive_edges -= set(cascade_edges)
if using_log:
log_probas_from_inactive_edges = np.log([(1 - p_dict[(u, v)]) for u, v in inactive_edges]).sum()
return log_probas_from_active_edges + log_probas_from_inactive_edges
else:
probas_from_inactive_edges = np.product([(1 - p_dict[(u, v)]) for u, v in inactive_edges])
return probas_from_active_edges * probas_from_inactive_edges
def tree_probability_nx(g, edges):
numer = np.product([g[u][v]['weight'] for u, v in edges])
denum = np.product([g.out_degree(u, weight='weight') for u, v in edges])
return numer / denum
def casccade_probability_nx(g, cascade_edges):
return np.product([g[u][v]['weight'] for u, v in cascade_edges])
def sampled_tree_freqs(gi, X, root, sampling_method, N):
trees = [swap_end_points(random_steiner_tree(gi, X, root, method=sampling_method))
for i in range(N)]
tree_freq = Counter(trees)
return tree_freq