-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactored the code, added a description
- Loading branch information
1 parent
a654c46
commit 7a1da9d
Showing
5 changed files
with
68 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,68 @@ | ||
from abc import ABCMeta, abstractmethod | ||
import numpy as np | ||
import pandas as pd | ||
from sklearn.metrics import pairwise_distances, roc_curve, roc_auc_score | ||
from sklearn.model_selection import StratifiedKFold, KFold, train_test_split | ||
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis | ||
from copy import deepcopy | ||
from joblib import Parallel, delayed | ||
from scipy import interpolate | ||
from scipy.signal import argrelmax | ||
|
||
from .cpdc import ChangePointDetectionBase | ||
from roerich.scores.fd import frechet_distance | ||
from roerich.scores.mmd import maximum_mean_discrepancy | ||
from roerich.scores.energy import energy_distance | ||
|
||
|
||
class ScoreCalculator(ChangePointDetectionBase): | ||
class SlidingWindows(ChangePointDetectionBase): | ||
|
||
def __init__(self, metric=None, func=None, periods=1, window_size=100, step=1, n_runs=1): | ||
def __init__(self, metric=None, periods=1, window_size=100, step=1, n_runs=1): | ||
super().__init__(periods=periods, window_size=window_size, step=step, n_runs=n_runs) | ||
self.metric = metric | ||
self.func = func | ||
|
||
""" | ||
Change point detection algorithm based on binary classification. | ||
Parameters: | ||
----------- | ||
periods: int, default=1 | ||
Number of consecutive observations of a time series, considered as one input vector. | ||
The signal is considered as an autoregression process (AR) for classification. In the most cases periods=1 | ||
will be a good choice. | ||
window_size: int, default=100 | ||
Number of consecutive observations of a time series in test and reference | ||
windows. Recommendation: select the value so that there is only one change point within 2*window_size | ||
observations of the signal. | ||
step: int, default=1 | ||
Algorithm estimates change point detection score for each <step> observation. step > 1 helps | ||
to speed up the algorithm. | ||
n_runs: int, default=1 | ||
Number of times, the binary classifier runs on each pair of test and reference | ||
windows. Observations in the windows are divided randomly between train and validation sample every time. | ||
n_runs > 1 helps to reduce noise in the change point detection score. | ||
metric: str/function, default=None | ||
Function that gives the measure of dissimilarity between data points in windows. | ||
Metric should be one of: EnergyDist, FrechetDist, MaxMeanDisc; or a function should be passed. | ||
Function must be in the following format: | ||
Parameters: | ||
----------- | ||
X_ref: numpy.ndarray | ||
Matrix of reference observations. | ||
X_test: numpy.ndarray | ||
Matrix of test observations. | ||
Returns: | ||
-------- | ||
score: float | ||
Estimated change point detection score for a pair of window. | ||
""" | ||
|
||
def reference_test_predict(self, X_ref, X_test): | ||
|
||
if self.metric == "EnergyDist": | ||
n = X_ref.shape[0] | ||
E = 2*pairwise_distances(X_ref, X_test, metric='euclidean') - pairwise_distances(X_test, metric='euclidean') - pairwise_distances(X_ref, metric='euclidean') | ||
return np.sum(E) / n ** 2 | ||
elif self.metric == "FrechetDist": | ||
if self.metric == "energy": | ||
return energy_distance(X_ref, X_test) | ||
elif self.metric == "fd": | ||
return frechet_distance(X_ref, X_test) | ||
elif self.metric == "MaxMeanDisc": | ||
elif self.metric == "mmd": | ||
return maximum_mean_discrepancy(X_ref, X_test) | ||
elif self.func is not None: | ||
return self.func(X_ref, X_test) | ||
elif callable(self.metric): | ||
return self.metric(X_ref, X_test) | ||
else: | ||
raise ValueError("metric should be one of: EnergyDist, FrechetDist, MaxMeanDisc; or a function should be " | ||
"passed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
import numpy as np | ||
from sklearn.metrics import pairwise_distances | ||
|
||
|
||
def energy_distance(x, y): | ||
n = x.shape[0] | ||
e = 2*pairwise_distances(x, y, metric='euclidean')\ | ||
- pairwise_distances(y, metric='euclidean') \ | ||
- pairwise_distances(x, metric='euclidean') | ||
return np.sum(e) / n ** 2 |