-
Notifications
You must be signed in to change notification settings - Fork 1
/
tda_pipeline.py
189 lines (160 loc) · 7.31 KB
/
tda_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# This file is part of the Gudhi Library - https://gudhi.inria.fr/ - which is released under MIT.
# See file LICENSE or go to https://gudhi.inria.fr/licensing/ for full license details.
# Author(s): Vincent Rouvreau
#
# Copyright (C) 2021 Inria
#
# Modification(s):
# - YYYY/MM Author: Description of the modification
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from gudhi import RipsComplex
from joblib import Parallel, delayed
class DataSelector(BaseEstimator, TransformerMixin):
"""
This is a class to select data from a data frame and set it with the correct format.
"""
def __init__(self, start=0, end=250, w=80, n_jobs=None):
"""
Constructor for the DataSelector class.
Parameters:
start (date): The date index in the data frame to start the data analysis. Default value is `0`.
end (date): The date index in the data frame to end the data analysis. Default value is `250`.
w (int): The window size in days. Default value is `80`.
n_jobs (int): cf. https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html
"""
self.start = start
self.end = end
self.w = w
self.n_jobs = n_jobs
def fit(self, X, Y=None):
"""
Nothing to be done, but useful when included in a scikit-learn Pipeline.
"""
return self
def transform(self, X, Y=None):
"""
Select persistence diagrams from its dimension.
Parameters:
X (Pandas DataFrame): Indexed by date, time series of indices
Returns:
For each day of the DataFrame, returns `w` points.
"""
return[X.iloc[(idx - self.w):idx].values for idx in range(self.start+self.w, self.end)]
class RipsPersistence(BaseEstimator, TransformerMixin):
"""
This is a class for computing the persistence diagrams from a Rips complex.
"""
def __init__(
self,
max_rips_dimension=2,
max_edge_length=float("inf"),
collapse_edges=True,
max_persistence_dimension=0,
only_this_dim=-1,
homology_coeff_field=11,
min_persistence=0.0,
n_jobs=None,
):
"""
Constructor for the RipsPersistence class.
Parameters:
points (Iterable of iterable of float): A point cloud.
max_rips_dimension (int): Rips expansion until this dimension.
max_edge_length (float): Maximal edge length to be taken into account.
max_persistence_dimension (int): The returned persistence diagrams maximal dimension. Default value is `0`.
Ignored if `only_this_dim` is set.
only_this_dim (int): The returned persistence diagrams dimension. If `only_this_dim` is set,
`max_persistence_dimension` will be ignored.
Short circuit the use of :class:`~gudhi.sklearn.post_processing.DimensionSelector` when only one
dimension matters.
homology_coeff_field (int): The homology coefficient field. Must be a prime number. Default value is 11.
min_persistence (float): The minimum persistence value to take into account (strictly greater than
`min_persistence`). Default value is `0.0`. Sets `min_persistence` to `-1.0` to see all values.
n_jobs (int): cf. https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html
"""
self.max_rips_dimension=max_rips_dimension
self.max_edge_length = max_edge_length
self.collapse_edges = collapse_edges
self.max_persistence_dimension = max_persistence_dimension
self.only_this_dim = only_this_dim
self.homology_coeff_field = homology_coeff_field
self.min_persistence = min_persistence
self.n_jobs = n_jobs
def fit(self, X, Y=None):
"""
Nothing to be done, but useful when included in a scikit-learn Pipeline.
"""
return self
def __transform(self, points):
rips = RipsComplex(points=points, max_edge_length=self.max_edge_length)
if self.collapse_edges:
stree = rips.create_simplex_tree(max_dimension=1)
stree.collapse_edges()
stree.expansion(self.max_rips_dimension)
else:
stree = rips.create_simplex_tree(max_dimension=self.max_rips_dimension)
stree.compute_persistence(
homology_coeff_field=self.homology_coeff_field, min_persistence=self.min_persistence
)
return [
stree.persistence_intervals_in_dimension(dim) for dim in range(self.max_persistence_dimension + 1)
]
def __transform_only_this_dim(self, points):
rips = RipsComplex(points=points, max_edge_length=self.max_edge_length)
if self.collapse_edges:
stree = rips.create_simplex_tree(max_dimension=1)
stree.collapse_edges()
stree.expansion(self.max_rips_dimension)
else:
stree = rips.create_simplex_tree(max_dimension=self.max_rips_dimension)
stree.compute_persistence(
homology_coeff_field=self.homology_coeff_field, min_persistence=self.min_persistence
)
return stree.persistence_intervals_in_dimension(self.only_this_dim)
def transform(self, X, Y=None):
"""
Compute all the Rips complexes and their associated persistence diagrams.
Parameters:
X (list of list of double OR list of numpy.ndarray): List of point clouds.
Returns:
Persistence diagrams in the format:
- If `only_this_dim` was set to `n`: `[array( Hn(X[0]) ), array( Hn(X[1]) ), ...]`
- else: `[[array( H0(X[0]) ), array( H1(X[0]) ), ...], [array( H0(X[1]) ), array( H1(X[1]) ), ...], ...]`
"""
if self.only_this_dim == -1:
# threads is preferred as rips construction and persistence computation releases the GIL
return Parallel(n_jobs=self.n_jobs, prefer="threads")(delayed(self.__transform)(cells) for cells in X)
else:
# threads is preferred as rips construction and persistence computation releases the GIL
return Parallel(n_jobs=self.n_jobs, prefer="threads")(
delayed(self.__transform_only_this_dim)(cells) for cells in X
)
class LPNorm(BaseEstimator, TransformerMixin):
"""
This is a class to compute Landscape Lp Norm L1 and L2.
"""
def __init__(self, n_jobs=None):
"""
Constructor for the LPNorm class.
Parameters:
n_jobs (int): cf. https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html
"""
self.n_jobs = n_jobs
def fit(self, X, Y=None):
"""
Nothing to be done, but useful when included in a scikit-learn Pipeline.
"""
return self
def __transform(self, landscape):
return [np.linalg.norm(landscape, ord=1), np.linalg.norm(landscape, ord=2)]
def transform(self, X, Y=None):
"""
Computes Landscape Lp Norm L1 and L2.
Parameters:
X (numpy array): Landscape
Returns:
Lp Norm L1 and L2.
"""
return [[np.linalg.norm(landscape, ord=1), np.linalg.norm(landscape, ord=2)] for landscape in X]
#return Parallel(n_jobs=self.n_jobs)(delayed(self.__transform)(landscape) for landscape in X)