From 20d9e9f847c3a4fbd5062952040bb62e844a8938 Mon Sep 17 00:00:00 2001 From: GishB Date: Wed, 20 Mar 2024 23:20:32 +0300 Subject: [PATCH] add setup py and LICENSE --- build/lib/data/CloudData.py | 58 +++++ build/lib/data/DataTransformers.py | 72 ++++++ build/lib/data/SythData.py | 173 ++++++++++++++ build/lib/data/__init__.py | 0 build/lib/examples/DefaultStreamlitApp.py | 38 +++ build/lib/examples/__init__.py | 0 .../OfflineCPD_experiments/__init__.py | 0 .../OnlineCPD_experiments/__init__.py | 0 .../OnlineCPD_experiments/kalman_runner.py | 45 ++++ .../kalman_sst_runner.py | 0 .../OnlineCPD_experiments/sst_runner.py | 0 build/lib/models/ModelConstructors.py | 122 ++++++++++ build/lib/models/ProbabilityBased.py | 189 +++++++++++++++ build/lib/models/SubspaceBased.py | 216 +++++++++++++++++ build/lib/models/__init__.py | 0 build/lib/optimization/WSSAlgorithms.py | 221 ++++++++++++++++++ build/lib/optimization/__init__.py | 0 build/lib/utils/DataTransformers.py | 119 ++++++++++ build/lib/utils/Reports.py | 81 +++++++ build/lib/utils/StreamlitFunctions.py | 200 ++++++++++++++++ build/lib/utils/__init__.py | 0 setup.py | 38 +++ tests/test_ProbabilityBased.py | 6 - tests/test_SubspaceBased.py | 8 +- 24 files changed, 1573 insertions(+), 13 deletions(-) create mode 100644 build/lib/data/CloudData.py create mode 100644 build/lib/data/DataTransformers.py create mode 100644 build/lib/data/SythData.py create mode 100644 build/lib/data/__init__.py create mode 100644 build/lib/examples/DefaultStreamlitApp.py create mode 100644 build/lib/examples/__init__.py create mode 100644 build/lib/experiments/OfflineCPD_experiments/__init__.py create mode 100644 build/lib/experiments/OnlineCPD_experiments/__init__.py create mode 100644 build/lib/experiments/OnlineCPD_experiments/kalman_runner.py create mode 100644 build/lib/experiments/OnlineCPD_experiments/kalman_sst_runner.py create mode 100644 build/lib/experiments/OnlineCPD_experiments/sst_runner.py create mode 100644 build/lib/models/ModelConstructors.py create mode 100644 build/lib/models/ProbabilityBased.py create mode 100644 build/lib/models/SubspaceBased.py create mode 100644 build/lib/models/__init__.py create mode 100644 build/lib/optimization/WSSAlgorithms.py create mode 100644 build/lib/optimization/__init__.py create mode 100644 build/lib/utils/DataTransformers.py create mode 100644 build/lib/utils/Reports.py create mode 100644 build/lib/utils/StreamlitFunctions.py create mode 100644 build/lib/utils/__init__.py diff --git a/build/lib/data/CloudData.py b/build/lib/data/CloudData.py new file mode 100644 index 0000000..a779dce --- /dev/null +++ b/build/lib/data/CloudData.py @@ -0,0 +1,58 @@ +import pandas as pd +from io import StringIO +import requests + + +class DrillingData: + """ This class helps to load las files for one of selected horizontal well. + + Notes: + - If you define name which does not exist then program will raise NameError! + - All data are restored on public yandex cloud server. + - If you need to load just raw data then use load_raw_data method. + + Attributes: + dataset_name: indicate which well you need to load data. + """ + + def __init__(self, + dataset_name: str = "default", + sep: str = ","): + self.url_dict = { + "229G": "https://storage.yandexcloud.net/cloud-files-public/229G_las_files.csv", + "231G": "https://storage.yandexcloud.net/cloud-files-public/231G_las_files.csv", + "237G": "https://storage.yandexcloud.net/cloud-files-public/237G_las_files.csv", + "xxxAA564G": "https://storage.yandexcloud.net/cloud-files-public/dataframe.csv", + "xxxAA684G": "https://storage.yandexcloud.net/cloud-files-public/dataframe.csv" + } + self.dataset_name = dataset_name + self.sep = sep + if dataset_name not in ["default", "229G", "231G", "237G", "xxxAA684G", "xxxAA564G"]: + raise NameError("There is not such dataset name.") + if dataset_name in ["xxxAA684G", "xxxAA564G"]: + self.sep = "|" + + def load_raw_data(self, url: str) -> pd.DataFrame: + """ Load las files as it is in pandas format. + + Warning: + - These files are available only for education purposes and shall not be used for any other points. + + Notes: + - value like -9999 means that data has been missing. + - unitless column means type of layer. + - uR/h the most important drilling data columns for analysis. + + Returns: + pandas dataframe with all available columns and rows from chosen las file. + """ + return pd.read_csv(StringIO(requests.get(url).content.decode('utf-8')), sep=self.sep) + + def get(self) -> pd.DataFrame: + if self.dataset_name == "default": + raw_data = self.load_raw_data(url=self.url_dict.get("237G")) + else: + raw_data = self.load_raw_data(url=self.url_dict.get(self.dataset_name)) + if self.dataset_name in ["xxxAA684G", "xxxAA564G"]: + raw_data = raw_data[raw_data[raw_data.columns[0]] == self.dataset_name] + return raw_data.reset_index(drop=True) diff --git a/build/lib/data/DataTransformers.py b/build/lib/data/DataTransformers.py new file mode 100644 index 0000000..0494e81 --- /dev/null +++ b/build/lib/data/DataTransformers.py @@ -0,0 +1,72 @@ +import pandas as pd +import numpy as np + + +class CloudDataTransformer: + """ This class helps to transform raw data as it expected for experiments in the project. + + Attributes: + df: dataframe which was loaded from the cloud. + dataset_name: dataset name which was loaded via CloudData + + """ + + def __init__(self, df: pd.DataFrame = None, + dataset_name: str = "default"): + self.df = df + self.dataset_name = dataset_name + + if self.df is None: + raise ValueError("dataset has not been defined!") + + if self.dataset_name not in ["default", "229G", "231G", "237G", "xxxAA684G", "xxxAA564G"]: + raise NameError("There is not such dataset name.") + + @staticmethod + def add_time_column(df: pd.DataFrame) -> pd.DataFrame: + df['time'] = np.arange(0, df.shape[0] * 1, 1).astype('datetime64[s]') + df = df.set_index('time') + return df + + @staticmethod + def replace_nan_values(df: pd.DataFrame) -> pd.DataFrame: + return df.replace(['-9999', -9999, 'missing', '#'], np.nan) + + def drop_nan_values(self, df: pd.DataFrame) -> pd.DataFrame: + df = self.replace_nan_values(df) + df = df[df['unitless'].notna()] + df = df[df['uR/h'].notna()] + return df + + @staticmethod + def add_expected_change_points(df: pd.DataFrame) -> pd.DataFrame: + cps_list = [1 if df['unitless'].iloc[i] != df['unitless'].iloc[i + 1] else 0 for i in range(df.shape[0] - 1)] + df['CPs'] = cps_list + [0] + return df + + @staticmethod + def take_expected_columns(df: pd.DataFrame) -> pd.DataFrame: + return df[["uR/h", "ohmm", "ohmm.6", "m/hr", "unitless", "CPs"]].reset_index(drop=True) + + @staticmethod + def rename_column_special(df: pd.DataFrame) -> pd.DataFrame: + return df.rename(columns={"uR/h": "GR", + "ohmm": "Resist_short", + "ohmm.6": "Resist_long", + "unitless": "LITHOLOGY", + "m/hr": "DrillingSpeed"}) + + def transform(self) -> pd.DataFrame: + """ Transform data initial point. + + Returns: + DataFrame as it expected to be for any future tasks. + """ + df = self.df + if self.dataset_name in ["xxxAA684G", "xxxAA564G"]: + df = self.replace_nan_values(df) + df = self.add_expected_change_points(df) + df = self.take_expected_columns(df) + df = self.rename_column_special(df) + df = self.add_time_column(df) + return df diff --git a/build/lib/data/SythData.py b/build/lib/data/SythData.py new file mode 100644 index 0000000..667da9c --- /dev/null +++ b/build/lib/data/SythData.py @@ -0,0 +1,173 @@ +import numpy as np +import pandas as pd + + +class SythDataConstructor: + """ This is fundament class which should be used for any syth data generators. + + Notes: + length_data % cpu_numbers has to be equal 0! + + Attributes: + frequency: which freq should be used seconds, minutes, days. + length_data: just how many points should be generated. + cps_number: number of change points over generated data. + """ + + def __init__(self, + white_noise_level: str = "default", + frequency: str = "s", + length_data: int = 24 * 7 * 15 + 15, + cps_number: int = 15): + self.frequency = frequency + self.length_data = length_data + self.cps_number = cps_number + + self.white_mean = 0 + if white_noise_level == "default": + self.white_std = 0.5 + elif white_noise_level == "max": + self.white_std = 1 + elif white_noise_level == "min": + self.white_std = 0.01 + else: + raise NameError("Not implemented white noise level!") + + if length_data % cps_number != 0: + raise ValueError("Not equal length of data and cpu_numbers expected from syth data!") + + def generate_empty_df(self) -> pd.DataFrame: + """ Generate dataframe with timestamps. + + Returns: + pandas dataframe with expected frequency and length + """ + return pd.DataFrame(index=pd.date_range(start="10/07/1999", + periods=self.length_data, + freq=self.frequency, + normalize=True, + inclusive="both", + name="time")) + + def generate_white_noise(self) -> np.array: + """ Generate random noise for your data. + + Returns: + array of white noise based on expected length of data. + """ + return np.random.normal(self.white_mean, + self.white_std, + size=self.length_data) + + def generate_array_of_change_points(self) -> np.array: + """ Generate values which represent CPs over syth data. + + Returns: + numpy array of int values where 1 is change point and 0 is default value. + """ + cps_index = [i for i in range(self.length_data // self.cps_number, + self.length_data, + self.length_data // self.cps_number)] + dp = [0 if i not in cps_index else 1 for i in range(self.length_data)] + return np.array(dp) + + def generate_data(self) -> np.array: + """ Generate syth data array + + Returns: + expected syth data based on class idea. + """ + ... + + def get(self) -> pd.DataFrame: + """ Get syth data. + + Returns: + pandas dataframe with syth data and time index. + """ + ... + + +class LinearSteps(SythDataConstructor): + def get_linear_array(self, + beta_past: float, + k_past: float, + beta_mutation_coeff: float, + k_mutation_coeff: float) -> tuple[np.array, float, float]: + """ Generate random linear array based on past observation + + Notes: + beta_mutation_coeff as well as k_mutation_coeff should be defined based on expertise. These coefficients + help to connect nearest arrays. + + Args: + beta_past: beta value in the past array. + k_past: k coefficient in the past array. + beta_mutation_coeff: treshold for beta deviation. + k_mutation_coeff: treshold for k coeff deviation. + + Returns: + tuple of generated data and info for this generations beta and k_coeff. + """ + beta = np.random.uniform(beta_past, 1) + k_coeff = np.random.uniform(k_past, 1) + if np.random.uniform(0, 1) > beta_mutation_coeff: + beta = np.random.uniform(-1, 1) + if np.random.uniform(0, 1) > k_mutation_coeff: + k_coeff = np.random.uniform(-1, 1) + dp = [k_coeff * x + beta for x in range(0, self.length_data // self.cps_number)] + return np.array(dp), beta, k_coeff + + def generate_data(self, initial_beta: float = -0.01, + initial_k: float = 0.2, + beta_mutation_coeff: float = 0.8, + k_mutation_coeff: float = 0.2) -> np.array: + dp = [] + for steps in range(self.cps_number): + temp_info = self.get_linear_array(initial_beta, + initial_k, + beta_mutation_coeff, + k_mutation_coeff) + dp.extend(temp_info[0]) + initial_beta = temp_info[1] + initial_k = temp_info[2] + return np.array(dp) + + def get(self): + df = self.generate_empty_df() + df['x'] = np.add(self.generate_data(), self.generate_white_noise()) + df['CPs'] = self.generate_array_of_change_points() + return df + + +class SinusoidWaves(SythDataConstructor): + def get_sinusoid_array(self, beta_past: float, beta_mutation_coeff: float) -> tuple[np.array, float]: + """ Generate sinusoid waves over expected shape. + + Args: + beta_past: beta coefficient for sinus wave. + beta_mutation_coeff: coeff for mutation operator. + + Returns: + array of sinusoid data + """ + beta_past = np.random.uniform(low=beta_past, high=2) + if np.random.uniform(low=0, high=1) > beta_mutation_coeff: + beta_past = np.random.uniform(low=-2, high=2) + x = np.linspace(start=0, stop= self.length_data // self.cps_number, num=self.length_data // self.cps_number) + return np.sin(x) * beta_past, beta_past + + def generate_data(self, initial_beta: float = 0.5, beta_mutation_coeff: float = 0.5) -> np.array: + dp = [] + for steps in range(self.cps_number): + temp_info = self.get_sinusoid_array(initial_beta, + beta_mutation_coeff) + dp.extend(temp_info[0]) + initial_beta = temp_info[1] + return np.array(dp) + + def get(self): + df = self.generate_empty_df() + df['x'] = np.add(self.generate_data(), self.generate_white_noise()) + df['CPs'] = self.generate_array_of_change_points() + return df diff --git a/build/lib/data/__init__.py b/build/lib/data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/examples/DefaultStreamlitApp.py b/build/lib/examples/DefaultStreamlitApp.py new file mode 100644 index 0000000..e0fb5eb --- /dev/null +++ b/build/lib/examples/DefaultStreamlitApp.py @@ -0,0 +1,38 @@ +import streamlit as st +import sys + +sys.path.append("..") + +import utils.StreamlitFunctions as useful + +st.title('Change Point Detection examples.') + +st.sidebar.header('UI Model pipeline') +option_model = st.sidebar.selectbox( + 'Select CPD model', + ("Singular Sequence Decomposition", "Kalman Filter")) +model_params = useful.init_model_params(model_name=option_model) + +option_data = st.sidebar.selectbox( + 'Select dataset', + ("None", "Syth-Steps", "Syth-Sinusoid")) + +df = None +if option_data != "None": + data_loader_params = useful.init_data_loader_params() + df = useful.data_loader(option=option_data, params=data_loader_params) + useful.data_info(df) + useful.data_plot(df) + +option_start_model = st.sidebar.selectbox( + 'Run selected model', + ("None", "RUN!")) + +df_updated = None +if option_start_model != "None" and df is not None: + df_updated = useful.init_model_pipeline(name_model=option_model, params=model_params, df=df) + +if df_updated is not None: + summary_df = useful.model_summary(df=df_updated) + useful.data_info(summary_df) + useful.plot_results(df_updated) diff --git a/build/lib/examples/__init__.py b/build/lib/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/experiments/OfflineCPD_experiments/__init__.py b/build/lib/experiments/OfflineCPD_experiments/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/experiments/OnlineCPD_experiments/__init__.py b/build/lib/experiments/OnlineCPD_experiments/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/experiments/OnlineCPD_experiments/kalman_runner.py b/build/lib/experiments/OnlineCPD_experiments/kalman_runner.py new file mode 100644 index 0000000..253580e --- /dev/null +++ b/build/lib/experiments/OnlineCPD_experiments/kalman_runner.py @@ -0,0 +1,45 @@ +import sys +import os +sys.path.append(os.path.abspath("../..")) + +from utils import libs_cpd, Reports as crtest +import data.CloudData as dtest +import models.ProbabilityBased as kalman +import utils.GeneralFunctions as optf + + +if len(sys.argv) > 1: + i = dtest.list_links[int(sys.argv[1])]; + name = str(sys.argv[2]); +else: + i = dtest.list_links[0]; + name = "test"; + +df = dtest.df_expirement(i) + +window_length_savgol = libs_cpd.WindowSizeSelection(time_series = list(df.GR), + wss_algorithm = 'summary_statistics_subsequence').get_window_size()[0] +norm_filter_gr = optf.normalization_linear(optf.filter_Savgol(df.Resist_short, window_length_savgol)) +window_length = libs_cpd.WindowSizeSelection(time_series = norm_filter_gr, + wss_algorithm = 'dominant_fourier_frequency', window_max=1000, window_min=50).get_window_size()[0] + +cps_list_kalman = kalman.online_detection(list(df['GR']), window=window_length, queue_window=10, treshold_coef=4.3) +df['cps_kalman'] = cps_list_kalman + +tsad_average_results = crtest.tsad_average(df.cps_kalman, df.CPs) +tsad_nab_results = crtest.tsad_nab(df.cps_kalman, df.CPs) +tsad_nab_results.update(tsad_average_results) + +report = crtest.create_report(tsad_nab_results) + + +#downloand report and image with predicted labels +libs_cpd.plt.figure(figsize=(12, 3)) +libs_cpd.plt.plot(list(df.CPs), label='original CPs') +libs_cpd.plt.plot(cps_list_kalman, label='predicted CPs') +libs_cpd.plt.legend(loc="center right", bbox_to_anchor=(1.18, 0.5)) +libs_cpd.plt.savefig('predicted_' + name + '.jpg') + +with open('./predicted_'+name+'.txt','w') as out: + for key,val in report.items(): + out.write('{}:{}\n'.format(key,val)) diff --git a/build/lib/experiments/OnlineCPD_experiments/kalman_sst_runner.py b/build/lib/experiments/OnlineCPD_experiments/kalman_sst_runner.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/experiments/OnlineCPD_experiments/sst_runner.py b/build/lib/experiments/OnlineCPD_experiments/sst_runner.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/models/ModelConstructors.py b/build/lib/models/ModelConstructors.py new file mode 100644 index 0000000..7fba306 --- /dev/null +++ b/build/lib/models/ModelConstructors.py @@ -0,0 +1,122 @@ +""" +Highly used parent model classes functions for all others implemented models. +""" +import sys +import numpy as np + +from typing import Optional + +from utils.DataTransformers import Filter, Scaler +from optimization.WSSAlgorithms import WindowSizeSelection + +sys.path.append("..") + + +class ChangePointDetectionConstructor(WindowSizeSelection, Filter, Scaler): + """ Basic class to work with ChangePoint detection models. + + Attributes: + parameters: dict of parameters for selected model. + + """ + def __init__(self, + fast_optimize_algorithm: str = 'summary_statistics_subsequence', + is_cps_filter_on: bool = True, + is_fast_parameter_selection: bool = True, + threshold_std_coeff: float = 3.1, + queue_window: int = None, + sequence_window: int = None, + lag: int = None, + is_cumsum_applied: bool = True, + is_z_normalization: bool = True, + is_squared_residual: bool = True): + """ Highly used parameters. + + Args: + fast_optimize_algorithm: algorithm name from WSS optimization. + is_cps_filter_on: is filter change points option based on queue window. + is_fast_parameter_selection: is fast optimization applied. + threshold_std_coeff: threshold param for abnormal residuals. + queue_window: max limited window to filter cps. + sequence_window: max length of subsequence which is used for analysis on each step. + lag: step between two subsequences. + is_cumsum_applied: should we use cumsum algorithm for CPs detection. + is_z_normalization: normalization over residual data. + """ + + self.parameters = { + "is_fast_parameter_selection": is_fast_parameter_selection, + "fast_optimize_algorithm": fast_optimize_algorithm, + "is_cps_filter_on": is_cps_filter_on, + "threshold_std_coeff": threshold_std_coeff, + "queue_window" : queue_window, + "sequence_window": sequence_window, + "lag": lag, + "is_cumsum_applied": is_cumsum_applied, + "is_z_normalization": is_z_normalization, + "is_squared_residual": is_squared_residual + } + + def fit(self, + x_train: np.array, + y_train: Optional[np.array]): + """ Search for the best model hyperparameters. + + Notes: + 1. In default pipe model will use window size selection algorithm. + 2. You can default all parameters manually in init method. + 3. in case of fast optimal params searching you can drop y_train. + 4. all parameters are saved in self. parameters + + Args: + x_train: array of time-series values. + y_train: array of true labels. + + Returns: + self model object + """ + if self.parameters.get("is_fast_parameter_selection"): + super().__init__(time_series=x_train, wss_algorithm=self.parameters.get("fast_optimize_algorithm")) + sequence_window = self.runner_wss()[0] + self.parameters["sequence_window"] = sequence_window + if self.parameters.get("queue_window") is None: + queue_window = 10 + self.parameters["queue_window"] = queue_window + if self.parameters.get("lag") is None: + lag = sequence_window // 4 + self.parameters["lag"] = lag + else: + raise NotImplementedError("Any other optimization are not implemented yet! Select is_fast_optimize = True") + return self + + def get_distances(self, target_array: np.array) -> np.ndarray: + """ Apply custom method to get residuals from time series data. + + Notes: + Of course it is expected that data timeline has no missing points. + + Args: + target_array: 1-d time series data values. + + + Returns: + array of residuals shaped as initial ts. + """ + if target_array.shape[0] <= 10: + raise ArithmeticError("x_array shape is equal or lower to 10! It dose`t make sense at all.") + ... + return target_array + + def predict(self, target_array: np.array) -> np.ndarray: + """ Change Point Detection based on failure statistics. + + Notes: + 1. By default, we expect that threshold value can be found via quantile value due the fact that CPs shape are + less time series shape. + 2. Keep in mind that queue window algorithm always saves the first anomaly as true result and + drop others based on queue window range. + + Returns: + array of binary change points labels. + """ + ... diff --git a/build/lib/models/ProbabilityBased.py b/build/lib/models/ProbabilityBased.py new file mode 100644 index 0000000..823d483 --- /dev/null +++ b/build/lib/models/ProbabilityBased.py @@ -0,0 +1,189 @@ +import numpy as np +import pandas as pd + +import sys + +sys.path.append("..") +from models.ModelConstructors import ChangePointDetectionConstructor + + +class KalmanFilter(ChangePointDetectionConstructor): + """ Idea is to find data deviations based on Kalman extrapolation for nearest data. + """ + + def __init__(self, + sequence_window: int = None, + queue_window: int = None, + is_cps_filter_on: bool = True, + is_quantile_threshold: bool = False, + is_fast_parameter_selection: bool = True, + is_cumsum_applied: bool = True, + is_z_normalization: bool = True, + is_squared_residual: bool = True, + fast_optimize_algorithm: str = 'summary_statistics_subsequence', + threshold_quantile_coeff: float = 0.95, + threshold_std_coeff: float = 3.1, + kalman_power_coeff: float = 0.9): + super().__init__(queue_window=queue_window, + sequence_window=sequence_window, + fast_optimize_algorithm=fast_optimize_algorithm, + is_cps_filter_on=is_cps_filter_on, + is_fast_parameter_selection=is_fast_parameter_selection, + threshold_std_coeff=threshold_std_coeff, + is_cumsum_applied=is_cumsum_applied, + is_z_normalization=is_z_normalization, + is_squared_residual=is_squared_residual + ) + self.parameters["is_quantile_threshold"] = is_quantile_threshold + self.parameters["threshold_quantile_coeff"] = threshold_quantile_coeff + self.parameters["kalman_power_coeff"] = kalman_power_coeff + + @staticmethod + def get_array_info(array_slice: np.array) -> tuple[float, float]: + """ Get gaussian stats based on an array of values. + + Args: + array_slice: slice of time series values. + + Returns: + gaussian stats tuple as mean and std values + """ + gaussian_info = np.mean(array_slice), np.std(array_slice) + return gaussian_info + + @staticmethod + def gaussian_multiply(g1: tuple[float, float], g2: tuple[float, float]) -> tuple[float, float]: + """ Update gaussian stats based on prev info and current status. + + Notes: + first index is mean value + second index is var value + + TO DO: + Kalman Gain. + + Args: + g1: past gaussian stats. + g2: current gaussian stats. + + Returns: + likelihood gaussian statistics. + """ + mean = (g1[1] * g2[0] + g2[1] * g1[0]) / (g1[1] + g2[1]) + variance = (g1[1] * g2[1]) / (g1[1] + g2[1]) + return mean, variance + + @staticmethod + def forecast(mean_gauss: float, std_gauss: float) -> float: + """ forecast next values based on estimated gaussian coefficient. + + Args: + mean_gauss: expected mean value. + std_gauss: expected std value. + + Returns: + normal generated value based on expected mean and std. + """ + return np.random.normal(loc=mean_gauss, scale=std_gauss) + + @staticmethod + def update(mult_gaussian: tuple[float, float], actual_gaussian: tuple[float, float]) -> tuple[float, float]: + """ Update gaussian stats based on actual and kalman filter info. + + Args: + mult_gaussian: mean and var values based on Kalman Filter. + actual_gaussian: mean and var values based on actual time series info. + + Returns: + expected mean and var stat. + """ + return mult_gaussian[0] + actual_gaussian[0], mult_gaussian[1] + actual_gaussian[1] + + def get_distances(self, target_array: np.array) -> np.ndarray: + """ Generate residuals based on gaussian forecasted values. + + Notes: + By default, expected that array shape will be more ore equal to 100 values - (up to window size). + + Args: + target_array: 1-d time series data values. + + Returns: + array of residuals between generated data and real values. + """ + super().get_distances(target_array=target_array) + gaussian_forecasted_list = [val for val in target_array[:self.parameters.get("sequence_window")]] + gaussian_likelihood = self.get_array_info(target_array) + dp = [val for val in target_array[:self.parameters.get("sequence_window")]] + for generation_epoch in range(self.parameters.get("sequence_window"), target_array.shape[0]): + gaussian_forecasted_list.append(self.forecast(mean_gauss=gaussian_likelihood[0], + std_gauss=np.sqrt(gaussian_likelihood[1]))) + actual_gaussian = self.get_array_info(dp) + mult_gaussian = self.gaussian_multiply( + g1=gaussian_likelihood, + g2=actual_gaussian + ) + gaussian_likelihood = self.update(mult_gaussian, actual_gaussian) + dp.pop(0) + dp.append(target_array[generation_epoch]) + return np.array(gaussian_forecasted_list) - target_array + + def predict(self, target_array: np.array) -> np.ndarray: + """ Change Point Detection based on failure statistics. + + Notes: + 1. By default, we expect that threshold value can be found via quantile value due the fact that CPs shape are + less time series shape. + 2. Keep in mind that queue window algorithm always saves the first anomaly as true result and + drop others based on queue window range. + + Returns: + array of binary change points labels. + """ + residuals = abs(target_array - abs(self.get_distances(target_array))) + if self.parameters.get("is_z_normalization"): + residuals = self.z_normalization(residuals) + if self.parameters.get("is_squared_residual"): + residuals = residuals**2 + if self.parameters.get('is_cumsum_applied'): + alarm_index = self.cumsum(residuals)[2] + cps_list = np.zeros_like(residuals) + for index in alarm_index: + cps_list[index] = 1 + else: + dp = [val for val in residuals[:self.parameters.get("queue_window")]] + cps_list = [0 for ind in range(self.parameters.get("sequence_window"))] + mean_val = np.mean(dp) + std_val = np.std(dp) * self.parameters.get("threshold_std_coeff") + for val in residuals[self.parameters.get("sequence_window"):]: + if val > (mean_val + std_val) or val < (mean_val - std_val): + cps_list.append(1) + else: + cps_list.append(0) + dp.append(val) + dp.pop(0) + mean_val = np.mean(dp) + std_val = np.std(dp) * self.parameters.get("threshold_std_coeff") + if self.parameters.get("is_cps_filter_on"): + cps_list = self.queue(time_series=cps_list, + queue_window=self.parameters.get("queue_window"), + reversed=False) + return np.array(cps_list) + + +if __name__ == "__main__": + from models.ProbabilityBased import KalmanFilter + from data.SythData import SinusoidWaves + + data = SinusoidWaves(length_data=1000, cps_number=5, white_noise_level="min").get() + target_array = data['x'].values + + model = KalmanFilter(sequence_window=None, + is_fast_parameter_selection=True, + fast_optimize_algorithm='highest_autocorrelation', + queue_window=10, + threshold_quantile_coeff=0.95, + is_cps_filter_on=True).fit(x_train=list(target_array), y_train=None) + + cps_pred = model.predict(target_array) + stop = 0 diff --git a/build/lib/models/SubspaceBased.py b/build/lib/models/SubspaceBased.py new file mode 100644 index 0000000..c8ba3dd --- /dev/null +++ b/build/lib/models/SubspaceBased.py @@ -0,0 +1,216 @@ +from typing import Tuple +import numpy as np +from scipy.linalg import hankel +import sys + +sys.path.append("..") +from models.ModelConstructors import ChangePointDetectionConstructor + + +class SingularSequenceTransformer(ChangePointDetectionConstructor): + """ Idea is to find nearest change points based on abnormal subspace distance. + """ + + def __init__(self, + sequence_window: int = None, + queue_window: int = None, + n_components: int = 1, + lag: int = None, + is_cps_filter_on: bool = True, + is_quantile_threshold: bool = False, + is_exp_squared: bool = False, + is_fast_parameter_selection: bool = True, + is_cumsum_applied: bool = True, + is_z_normalization: bool = True, + is_squared_residual: bool = True, + fast_optimize_algorithm: str = 'summary_statistics_subsequence', + threshold_quantile_coeff: float = 0.91, + threshold_std_coeff: float = 3.61, + ): + """ + + Args: + n_components: PCA components number describes changes in time-data (usually we have 1,2 or 3). + sequence_window: window which we need to analyse each step over time series. + queue_window: min distance between two change points. + is_cps_filter_on: should we use queue window algorithm. + is_quantile_threshold: should we take quantile value as threshold. + lag: distance between two nearest matrix. + threshold_quantile_coeff: threshold coefficient for quantile. + threshold_std_coeff: threshold coefficient based on rule of thumb for normal distribution. + """ + + super().__init__(queue_window=queue_window, + sequence_window=sequence_window, + fast_optimize_algorithm=fast_optimize_algorithm, + is_cps_filter_on=is_cps_filter_on, + is_fast_parameter_selection=is_fast_parameter_selection, + threshold_std_coeff=threshold_std_coeff, + is_cumsum_applied=is_cumsum_applied, + is_z_normalization=is_z_normalization, + is_squared_residual=is_squared_residual + ) + self.parameters["is_quantile_threshold"] = is_quantile_threshold + self.parameters["threshold_quantile_coeff"] = threshold_quantile_coeff + self.parameters["n_components"] = n_components + self.parameters["lag"] = lag + self.parameters["threshold_quantile_coeff"] = threshold_quantile_coeff + # should we use exponential squared function for subspace distance + self.parameters["is_exp_squared"] = is_exp_squared + + @staticmethod + def get_hankel_matrix(sequence: np.array) -> np.ndarray: + """ Apply Hankel method over 1D time-series subsequence to transform it into matrix view. + + Arg: + sequence: time-series subsequence. + + Return: + Hankel matrix. + """ + return hankel(c=sequence) + + @staticmethod + def _sst_svd(x_test: np.array, x_history: np.array, n_components: int): + """Apply singular sequence transformation algorithm with SVD. + + Args: + x_test: matrix which represents time-series subsequence in the target time. + x_history: matrix which represents time-series subsequence in the past. + n_components: PCA components number describes changes in time-data (usually we have 1,2 or 3). + + Return: + distance between compared matrices. + """ + u_test, s_test, _ = np.linalg.svd(x_test, full_matrices=False) + u_history, s_hist, _ = np.linalg.svd(x_history, full_matrices=False) + s_cov = u_test[:, :n_components].T @ u_history[:, :n_components] + u_cov, s, _ = np.linalg.svd(s_cov, full_matrices=False) + return 1 - s[0] + + def get_current_matrix(self, ts: np.array) -> np.ndarray: + """ Calculate historical matrix based on lag between past and future. + + Args: + ts: target 1d sequence. + + Returns: + array of historical matrix. + """ + list_matrix = [] + for ind in range(ts.shape[0] - self.parameters.get("lag") - self.parameters.get("sequence_window")): + list_matrix.append(self.get_hankel_matrix(ts[ind:ind + self.parameters.get("sequence_window")])) + return np.array(list_matrix) + + def get_lagged_matrix(self, ts: np.array) -> np.ndarray: + """ Calculate future matrix based on lag between past and future. + + Args: + ts: target 1d sequence. + + Returns: + array of future matrix. + """ + list_matrix = [] + for ind in range(ts.shape[0] - self.parameters.get("lag") - self.parameters.get("sequence_window")): + list_matrix.append(self.get_hankel_matrix(ts[ind + self.parameters.get("lag"): + ind + self.parameters.get("lag") + + self.parameters.get("sequence_window")])) + return np.array(list_matrix) + + def preprocess_ts(self, ts: np.array) -> Tuple[np.ndarray, np.ndarray]: + """ Preprocess historical and future matrix based on array. + + Args: + ts: target 1d sequence. + + Returns: + tuple of arrays with historical and future matrix in each time step. + """ + present_matrix = self.get_current_matrix(ts) + lagged_matrix = self.get_lagged_matrix(ts) + return present_matrix, lagged_matrix + + def get_distances(self, target_array: np.array) -> np.ndarray: + """ Calculate subspace distances. + + Notes: + By default, this pipline based on SST SVD idea. + + Args: + target_array: target 1d time-series. + + Returns: + array of subspace distance score. + """ + score_list = np.zeros_like(target_array) + matrix_history, matrix_next = self.preprocess_ts(target_array) + counter: int = 0 + while counter != target_array.shape[0] - self.parameters.get("lag") - self.parameters.get("sequence_window"): + score_list[counter] = self._sst_svd(x_test=matrix_next[counter], + x_history=matrix_history[counter], + n_components=self.parameters.get("n_components")) + counter += 1 + if self.parameters.get("is_exp_squared"): + score_list = np.exp(score_list) ** 2 + return score_list + + def predict(self, target_array: np.array) -> np.ndarray: + """ Change Point Detection based on failure statistics. + + Notes: + 1. By default, we expect that threshold value can be found via quantile value due the fact that CPs shape are + less time series shape. + 2. Keep in mind that queue window algorithm always saves the first anomaly as true result and + drop others based on queue window range. + + Returns: + array of binary change points labels. + """ + residuals = abs(self.get_distances(target_array)) + if self.parameters.get("is_z_normalization"): + residuals = self.z_normalization(residuals) + if self.parameters.get('is_cumsum_applied'): + alarm_index = self.cumsum(residuals)[2] + cps_list = np.zeros_like(residuals) + for index in alarm_index: + cps_list[index] = 1 + else: + dp = [val for val in residuals[:self.parameters.get("queue_window")]] + cps_list = [0 for ind in range(self.parameters.get("sequence_window"))] + mean_val = np.mean(dp) + std_val = np.std(dp) * self.parameters.get("threshold_std_coeff") + for val in residuals[self.parameters.get("sequence_window"):]: + if val > (mean_val + std_val) or val < (mean_val - std_val): + cps_list.append(1) + else: + cps_list.append(0) + dp.append(val) + dp.pop(0) + mean_val = np.mean(dp) + std_val = np.std(dp) * self.parameters.get("threshold_std_coeff") + if self.parameters.get("is_cps_filter_on"): + cps_list = self.queue(time_series=cps_list, + queue_window=self.parameters.get("queue_window"), + reversed=False) + return np.array(cps_list) + + +if __name__ == "__main__": + sys.path.append("../..") + from data.SythData import SinusoidWaves + + data = SinusoidWaves(length_data=2000, cps_number=4, white_noise_level="min").get() + target_array = data['x'].values + model = SingularSequenceTransformer( + sequence_window=None, + lag=None, + queue_window=10, + is_cps_filter_on=True, + n_components=2, + is_fast_parameter_selection=True, + fast_optimize_algorithm='highest_autocorrelation', + threshold_std_coeff=2.65).fit(x_train=list(target_array), y_train=None) + # distances = model.get_distances(target_array=target_array) + cps_pred = model.predict(target_array=target_array) + stop = 0 diff --git a/build/lib/models/__init__.py b/build/lib/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/optimization/WSSAlgorithms.py b/build/lib/optimization/WSSAlgorithms.py new file mode 100644 index 0000000..9efb8cd --- /dev/null +++ b/build/lib/optimization/WSSAlgorithms.py @@ -0,0 +1,221 @@ +from typing import Union + +import numpy as np +from scipy.signal import argrelextrema, find_peaks + + +class WindowSizeSelection: + """Window Size Selection class. Whole-Series-Based (WSB) and subsequence-based (SB) algorithms. + + time_series: it can be a list or np.array() + time series sequences + window_max: int + maximum window length to check (O(n)!). Default is len(time_series)/2 + window_min: int + minimum window length to check (O(n)!). Default is 1 + wss_algorithm: str + type of WSS algorithm. It can be 'highest_autocorrelation', 'dominant_fourier_frequency', + 'summary_statistics_subsequence' or 'multi_window_finder'. + By default it is 'dominant_fourier_frequency' + Reference: + (c) "Windows Size Selection in Unsupervised Time Series Analytics: A Review and Benchmark. Arik Ermshaus, + Patrick Schafer, and Ulf Leser" + """ + + def __init__(self, + wss_algorithm: str = 'dominant_fourier_frequency', + time_series: Union[list, np.array] = None, + window_max: int = None, + window_min: int = None): + + self.dict_methods = {'highest_autocorrelation': self.autocorrelation, + 'multi_window_finder': self.multi_window_finder, + 'dominant_fourier_frequency': self.dominant_fourier_frequency, + 'summary_statistics_subsequence': self.summary_statistics_subsequence} + + self.time_series = time_series + self.window_max = window_max + self.window_min = window_min + self.wss_algorithm = wss_algorithm + self.length_ts = len(time_series) + + if self.window_max is None: + window_max_option = int(len(time_series) // 5) + if window_max_option > 250: + self.window_max = 250 + else: + self.window_max = window_max_option + + if self.window_min is None: + self.window_min = 10 + + if int(len(time_series) // 2) <= 100: + self.window_max = len(time_series) + + def autocorrelation(self): + """ + Main function for the highest_autocorrelation method + + :return: a tuple of selected window size and list of scores for this method + """ + list_score = [self.high_ac_metric(self.time_series[i:] + self.time_series[:i], i) \ + for i in range(self.window_min, self.window_max)] + selected_size_window, list_score_peaks = self.local_max_search(list_score) + return selected_size_window, list_score + + def high_ac_metric(self, copy_ts, i): + """ + Calculate metric value based on chosen chosen window size for the highest_autocorrelation method + + :param copy_ts: a list of lagged time series + :param i: temporary window size (or the lagged value) + :return: score for selected window size + """ + temp_len = len(copy_ts) + temp_coef = 1 / (temp_len - i) + mean_ts = np.mean(copy_ts) + std_ts = np.std(copy_ts) + score_list = [(copy_ts[g] - mean_ts) * (self.time_series[g] - mean_ts) / (std_ts ** 2) for g in range(temp_len)] + a_score = max(score_list) * temp_coef + return a_score + + def local_max_search(self, score_list): + """ + Find global max value id for the highest_autocorrelation method. + + :param score_list: a list of scores obtained + :return: a tuple of window_size_selected and list_score + """ + list_probably_peaks = find_peaks(score_list)[0][1:] + list_score_peaks = [score_list[i] for i in list_probably_peaks] + max_score = max(list_score_peaks) + window_size_selected = score_list.index(max_score) + self.window_min + return window_size_selected, list_score_peaks + + def dominant_fourier_frequency(self): + """ + Main function for the dominant_fourier_frequency + + :return: a tuple of window_size_selected and list_score + """ + list_score_k = [] + for i in range(self.window_min, self.window_max): + coeff_temp = self.coeff_metrics(i) + list_score_k.append(coeff_temp) + max_score = max(list_score_k) # take max value for selected window + window_size_selected = list_score_k.index(max_score) + self.window_min + return window_size_selected, list_score_k + + def coeff_metrics(self, temp_size): + """ + Find score coefficient for the dominant_fourier_frequency + + :param temp_size: temporary selected window size + :return: a score metric distance + """ + + length_n = len(self.time_series) + temp_list_coeff = [ts * np.exp((-2) * np.pi * (complex(0, -1)) * index * temp_size / length_n) \ + for index, ts in enumerate(self.time_series)] + complex_coeff = sum(temp_list_coeff) + score_coeff = np.sqrt(complex_coeff.real ** 2 + complex_coeff.imag ** 2) + return score_coeff + + def summary_statistics_subsequence(self): + """ + Main function for the summary_statistics_subsequence + + :return: selected window size and a list of score + """ + ts = (self.time_series - np.min(self.time_series)) / (np.max(self.time_series) - np.min(self.time_series)) + + stats_ts = [np.mean(ts), np.std(ts), np.max(ts) - np.min(ts)] + list_score = [self.suss_score(ts, window_size, stats_ts) for window_size + in range(self.window_min, self.window_max)] + window_size_selected = next(x[0] for x in enumerate(list_score) if x[1] > 0.89) + self.window_min + return window_size_selected, list_score + + def stats_diff(self, ts, window_size, stats_ts): + """ + Find difference between global statistic and statistic of subsequnces with different window + for the summary_statistics_subsequence + + :param ts: time series data + :param window_size: temporary selected window size + :param stats_ts: statistic over all ts for calculations + :return: not normalized euclidian distance between selected window size and general statistic for ts + """ + stat_w = [[np.mean(ts[i:i + window_size]), np.std(ts[i:i + window_size]), + np.max(ts[i:i + window_size]) - np.min(ts[i:i + window_size])] for i in range(self.length_ts)] + stat_diff = [[(1 / window_size) * np.sqrt((stats_ts[0] - stat_w[i][0]) ** 2 \ + + (stats_ts[1] - stat_w[i][1]) ** 2 \ + + (stats_ts[2] - stat_w[i][2]) ** 2)] for i in range(len(stat_w))] + return np.mean(stat_diff) + + def suss_score(self, ts, window_size, stats_ts): + """ + Find score coefficient for the the summary_statistics_subsequence + + :param ts: time series data + :param window_size: temporary selected window size + :param stats_ts: statistic over all ts for calculations + :return: normalized euclidian distance between selected window size and general statistic for ts + """ + s_min, s_max = self.stats_diff(ts, len(ts), stats_ts), self.stats_diff(ts, 1, stats_ts) + score = self.stats_diff(ts, window_size, stats_ts) + score_normalize = (score - s_min) / (s_max - s_min) + return 1 - score_normalize + + def multi_window_finder(self): + """ + Main function for multi_window_finder method + + :return: selected window size and a list of scores for this method + """ + distance_scores = [self.mwf_metric(i) for i in range(self.window_min, self.window_max)] + minimum_id_list, id_max = self.top_local_minimum(distance_scores) + print(minimum_id_list) + window_size_selected = minimum_id_list[1] * 10 + self.window_min + id_max + return window_size_selected, distance_scores + + def mwf_metric(self, window_selected_temp): + """ + Find multi_window_finder method metric value for a chosen window size + + :param window_selected_temp: temporary window selected + :return: value which is the MWF distance metric + """ + coeff_temp = 1 / window_selected_temp + m_values = [] + for k in range(self.length_ts - window_selected_temp + 1): + m_k = [self.time_series[g + k] for g in range(window_selected_temp - 1)] + m_values.append(coeff_temp * sum(m_k)) + distance_k = sum(np.log10(abs(m_values - np.mean(m_values)))) + return distance_k + + def top_local_minimum(self, distance_scores): + """ + Find a list of local minimum for multi_window_finder method + + :param distance_scores: list of distance scores from mwf_metric + :return: list of index where narray has minimum, max id for distance_scores list + """ + id_max = distance_scores.index(max(distance_scores)) + score_temp = distance_scores[id_max:] + number_windows_temp = len(score_temp) // 10 + scorer_list = [sum(abs(score_temp[i:i + 10] \ + - np.mean(score_temp[i:i + 10]))) \ + for i in range(number_windows_temp)] + id_local_minimum_list = argrelextrema(np.array(scorer_list), np.less)[0] + return id_local_minimum_list, id_max + + def runner_wss(self): + if int(len(self.time_series)) <= self.window_min: + window_size_selected, list_score = int(len(self.time_series)), [] + else: + window_size_selected, list_score = self.dict_methods[self.wss_algorithm]() + if window_size_selected > self.window_max: + window_size_selected = self.window_max + if window_size_selected < self.window_min: + window_size_selected = self.window_min + return window_size_selected, list_score diff --git a/build/lib/optimization/__init__.py b/build/lib/optimization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/utils/DataTransformers.py b/build/lib/utils/DataTransformers.py new file mode 100644 index 0000000..378e8f3 --- /dev/null +++ b/build/lib/utils/DataTransformers.py @@ -0,0 +1,119 @@ +import numpy as np +from scipy.signal import savgol_filter +from detecta import detect_cusum +from typing import Any + + +class Scaler: + """ 1D timeseries normalization/reverse normalization class. + """ + + def __init__(self): + self.stat_info = None + + @staticmethod + def info(ts: np.array) -> np.array: + """ Time series statistical information. + + Returns: + list of stst info as well as mean, std, var, quantile 25%, 50%, 75%. + """ + max_val = ts.max() + min_val = ts.min() + mean_val = ts.mean() + std_val = ts.std() + var_val = ts.var() + quantile_25 = np.quantile(ts, q=0.25) + quantile_50 = np.quantile(ts, q=0.50) + quantile_75 = np.quantile(ts, q=0.75) + return np.array([max_val, min_val, mean_val, + std_val, var_val, quantile_25, + quantile_50, quantile_75]) + + def linear_normalization(self, ts: np.array) -> np.array: + self.stat_info = self.info(ts) + normalized_ts = (ts - self.stat_info[1]) / (self.stat_info[0] - self.stat_info[1]) + return normalized_ts + + def reversed_linear_normalization(self, ts: np.array) -> np.array: + reversed_ts = ts * (self.stat_info[0] - self.stat_info[1]) + self.stat_info[2] + return reversed_ts + + @staticmethod + def log_normalization(ts: np.array) -> np.array: + normalized_ts = np.log2(ts) + return normalized_ts + + @staticmethod + def reversed_log_normalization(transformed_ts: np.array): + reversed_ts = 2 ** transformed_ts + return reversed_ts + + def z_normalization(self, ts: np.array) -> np.array: + self.stat_info = self.info(ts) + normalized_ts = (ts - self.stat_info[2]) / self.stat_info[3] + return normalized_ts + + def reverse_z_normalization(self, normalized_ts: np.array) -> np.array: + reversed_ts = normalized_ts * self.stat_info[3] + self.stat_info[1] + return reversed_ts + + +class Filter: + @staticmethod + def savgol(x: np.array, window_length: int, + polyorder: int = 3, mode: str = "nearest") -> np.array: + """ Savitsky-Golay filter implementation. + """ + return savgol_filter(x=x, window_length=window_length, polyorder=polyorder, mode=mode) + + @staticmethod + def queue(queue_window: int = 10, time_series: list[int] = None, reversed: bool = False) -> np.array: + """ Filter time series based on nearest value distant. + + Notes: + By default this function expect that array contains only binary values. + + Args: + queue_window: minimum distance between two values in your array. + time_series: array of values. + reversed: should we select the last anomaly as true positive and drop others based on window. + + Returns: + filtered array where minimum distance between nearest value preserved. + """ + if reversed: + time_series = time_series[::-1] + queue_list: list[int] = [0 for i in range(queue_window)] + filtered_score: list[int] = [] + for i in range(len(time_series)): + value = time_series[i] + if max(queue_list) != 0: + filtered_score.append(0) + queue_list.pop(0) + queue_list.append(0) + else: + filtered_score.append(value) + queue_list.pop(0) + queue_list.append(value) + return np.array(filtered_score) + + def cumsum(self, x: np.array) -> tuple[Any, Any, Any, Any]: + """ cumulative sum algorithm implementation to find abnormal behaviours in data. + + Notes: + By default x expect to contain residuals value between predicted values and real. + + Args: + x: array of values. + + Returns: + info arrays about change points. + """ + self.stat_info = self.info(x) + ending, start, alarm, cumsum = detect_cusum(x=x, + threshold=(self.stat_info[2] + self.stat_info[3] * 3), + drift=self.stat_info[3], + ending=True, + show=False) + return ending, start, alarm, cumsum diff --git a/build/lib/utils/Reports.py b/build/lib/utils/Reports.py new file mode 100644 index 0000000..1e638e9 --- /dev/null +++ b/build/lib/utils/Reports.py @@ -0,0 +1,81 @@ +import pandas as pd +from tsad.utils.evaluating.evaluating import evaluating +from typing import Any, Tuple + + +class SummaryReport: + """ Idea is to create fast report for CPs task. + """ + def __init__(self, + metric: str = "average_time", + window_width: str = "30 seconds", + portion: float = 0.1, + verbose: bool = False): + self.metric = metric + self.window_width = window_width + self.portion = portion + self.verbose = verbose + + def _evaluating(self, true: pd.Series, prediction: pd.Series) -> Tuple[Any]: + """ Calculate CPD results based on TSAD library. + + Args: + true: pd.Series of original data [0 or 1] + prediction: pd.Series of preds data [0 or 1] + + Returns: + tuple of metrics value. + """ + return evaluating(true=true, + prediction=prediction, + metric=self.metric, + window_width=self.window_width, + portion=self.portion, + verbose=self.verbose) + + def create_report(self, df: pd.DataFrame, column_name_preds: str, column_name_original: str) -> pd.DataFrame: + """ Generate one row report. + + Args: + df: dataframe with preds and original data where index is timestamp data. + column_name_preds: name of column where expected predicted CPs values. + column_name_original: name of column where expected original CPs values. + + Returns: + summary dataframe + """ + true = df[column_name_original] + preds = df[column_name_preds] + res = self._evaluating(true, preds) + if self.metric == "average_time": + columns_out = ["Average time (average delay)", + "Missing changepoints", + "FPs", + "Number of true change-points"] + out = pd.DataFrame(columns=columns_out, data=[res]) + else: + raise NotImplementedError("Not yet any other metrics implemented") + return out + + def create_big_report(self, + list_of_df: list[pd.DataFrame], + column_name_preds: str, + column_name_original: str) -> pd.DataFrame: + """ Create one big dataframe report for many dataframes. + + Notes: + column_name_preds and column_name_original should be the same in all dataframes! + + Args: + list_of_df: list of dataframe with preds and original, data index is timestamp data. + column_name_preds: name of column where expected predicted CPs values. + column_name_original: name of column where expected original CPs values. + + Returns: + summary dataframe + """ + out = [] + for df in list_of_df: + out.append(self.create_report(df, column_name_preds, column_name_original)) + return pd.concat(out).reset_index(drop=True) + diff --git a/build/lib/utils/StreamlitFunctions.py b/build/lib/utils/StreamlitFunctions.py new file mode 100644 index 0000000..dd24f6f --- /dev/null +++ b/build/lib/utils/StreamlitFunctions.py @@ -0,0 +1,200 @@ +import numpy as np +import pandas as pd +from typing import Optional, Dict, Any +import sys +import streamlit as st +import matplotlib.pyplot as plt + +sys.path.append("../..") + +from data.SythData import LinearSteps, SinusoidWaves +from models.SubspaceBased import SingularSequenceTransformer +from models.ProbabilityBased import KalmanFilter +from utils.Reports import SummaryReport + +target_column_value_name = "x" +original_cps_name = "CPs" +predicted_cps_name = "cps_preds" +residuals_name = "residuals" + + +@st.cache_data +def data_info(data: pd.DataFrame): + """ Plot pandas dataframe for table value overview. + + Args: + data: loaded pandas dataframe. + + Returns: + None. + """ + st.dataframe(data, use_container_width=True) + + +@st.cache_data +def data_plot(data: pd.DataFrame): + """ Plot data for target raw values and original cps. + + Args: + data: loaded pandas dataframe. + + Returns: + None + """ + + fig = plt.figure(figsize=(20, 5)) + plt.plot(data[target_column_value_name], label='Raw syth values') + plt.legend() + st.pyplot(fig=fig, use_container_width=True) + + fig_2 = plt.figure(figsize=(20, 5)) + plt.plot(data[original_cps_name], label='Original Change Points values') + plt.legend() + st.pyplot(fig=fig_2, use_container_width=True) + + +def init_model_params(model_name: str) -> Dict[str, Optional[Any]]: + """ Select params for custom model from module models. + + Returns: + dict of params for custom model. + """ + params = { + "is_cps_filter_on": st.sidebar.checkbox("is_cps_filter_on"), + "is_cumsum_applied": st.sidebar.checkbox("is_cumsum_applied"), + "queue_window": st.sidebar.slider('queue_window', 10, 100, 10), + "threshold_std_coeff": st.sidebar.slider('threshold_std_coeff', 2.1, 5.1, 3.1) + } + if model_name == "Singular Sequence Decomposition": + params["n_components"] = st.sidebar.slider('n_components PCA', 1, 3, 2), + elif model_name == "Kalman Filter": + pass + return params + + +def init_data_loader_params() -> Dict[str, Optional[Any]]: + """ Init params to loan syth data. + + Notes: + length_data should be equally split based on cps_number. + + Examples: + 1. length_data = 1000, cps_number=[2, 4, 10, 20, 50] + 2. length_data = 100, cps_number=[2, 4] + + Warnings: + cps_number should split data equally! + + Returns: + dict of params to generate syth data. + """ + params = { + "length_data": st.sidebar.slider('length_data', 100, 1000, 500), + "cps_number": st.sidebar.slider('cps_number', 2, 100, 5), + "white_noise_level": st.sidebar.radio( + "What's noise level you want to select", + ["min", "default", "max"], + index=0, + ) + } + return params + + +@st.cache_data +def data_loader(option: str, params: dict) -> Optional[pd.DataFrame]: + """ Load data based on option query. + + Args: + option: option name. + params: dict of params to generate syth data if any. + + Returns: + dataframe or None in case of none option. + """ + with st.spinner(text="Loading data in progress..."): + if option == "Syth-Steps": + data = LinearSteps(length_data=params.get("length_data"), + cps_number=params.get("cps_number"), + white_noise_level=params.get("white_noise_level")).get() + elif option == 'Syth-Sinusoid': + data = SinusoidWaves(length_data=params.get("length_data"), + cps_number=params.get("cps_number"), + white_noise_level=params.get("white_noise_level")).get() + else: + data = None + return data + + +@st.cache_data +def init_model_pipeline(name_model: str, params: dict, df: pd.DataFrame) -> Optional[pd.DataFrame]: + """ Model pipeline over syth data. + + Args: + name_model: selected model from UI. + params: selected initial params. + df: syth data. + + Returns: + dataframe with change points predicted and residuals. + """ + df_new = df.copy() + with st.spinner(text=f"Loading model {name_model} pipeline over syth data. Just wait..."): + cps_preds = None + residuals = None + if name_model == "Kalman Filter": + model = KalmanFilter(is_cps_filter_on=params.get("is_cps_filter_in"), + is_cumsum_applied=params.get("is_cumsum_applied"), + queue_window=params.get("queue_window"), + is_z_normalization=True, + is_squared_residual=True, + threshold_std_coeff=params.get("threshold_std_coeff")).fit(list(df.x), None) + cps_preds = model.predict(df.x.values) + residuals = model.get_distances(df.x.values) + elif name_model == "Singular Sequence Decomposition": + model = SingularSequenceTransformer( + is_cps_filter_on=params.get("is_cps_filter_in"), + is_cumsum_applied=params.get("is_cumsum_applied"), + is_z_normalization=True, + is_squared_residual=True, + n_components=2, + threshold_std_coeff=params.get("threshold_std_coeff")).fit(list(df.x), None) + cps_preds = model.predict(df.x.values) + residuals = model.get_distances(df.x.values) + df_new[predicted_cps_name] = cps_preds + df_new[residuals_name] = residuals + return df_new + + +@st.cache_data +def model_summary(df: pd.DataFrame) -> pd.DataFrame: + """ Calculate summary for output model data. + + Args: + df: dataframe where collected cps_preds and residuals info as well as original data. + + Returns: + summary dataframe. + """ + return SummaryReport().create_report(df=df, + column_name_preds="cps_preds", + column_name_original="CPs" + ) + + +def plot_results(df_summary_: pd.DataFrame): + """ Plot data for residuals and predicted CPs versus original data. + + Args: + df_summary_: result dataframe. + """ + + fig = plt.figure(figsize=(20, 5)) + plt.plot(df_summary_[residuals_name], label='Generated residuals based on model') + plt.legend() + st.pyplot(fig=fig, use_container_width=True) + + fig_2 = plt.figure(figsize=(20, 5)) + plt.plot(df_summary_[original_cps_name], label='Original Change Points values', color='green') + plt.plot(df_summary_[predicted_cps_name], label='Predicted Change Points values', color='red') + plt.legend() + st.pyplot(fig=fig_2, use_container_width=True) diff --git a/build/lib/utils/__init__.py b/build/lib/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py index e69de29..d1b651d 100644 --- a/setup.py +++ b/setup.py @@ -0,0 +1,38 @@ +__version__ = "0.0.1" + +from setuptools import setup, find_packages + +setup( + name='dd_cpd', + version=__version__, + python_requires='>=3.10.0', + url='https://github.com/GishB/DirectionalDrillingChangePointDetection', + license='GNU GPLv3', + packages=find_packages(exclude=[ + 'tests', + 'experiments', + '.github', + '.git', + '__pycache__', + '.pytest_cache', + '.idea', + '.git', + 'gitattributes' + ]), + author='Aleksandr Samofalov', + author_email='SamofalovWORK@yandex.ru', + description='Time Series Change Point Detection ' + 'to increase perfomance of Directional Drilling Processes at Oil and Gas Fields', + long_description=open('./README.md').read(), + install_requires=[ + 'pandas~=1.5.3', + 'pytest~=8.1.1', + 'numpy~=1.25.0', + 'streamlit<=1.31.0', + 'scipy~=1.11.4', + 'matplotlib~=3.7.1', + 'requests~=2.31.0', + 'detecta<=0.0.5' + 'tsad==0.19.3' + ], + zip_safe=False) diff --git a/tests/test_ProbabilityBased.py b/tests/test_ProbabilityBased.py index c9f23f0..31b1d71 100644 --- a/tests/test_ProbabilityBased.py +++ b/tests/test_ProbabilityBased.py @@ -1,10 +1,4 @@ -import pytest -import sys - import numpy as np - -sys.path.append("..") - from models.ProbabilityBased import KalmanFilter diff --git a/tests/test_SubspaceBased.py b/tests/test_SubspaceBased.py index 2a34af2..9a10241 100644 --- a/tests/test_SubspaceBased.py +++ b/tests/test_SubspaceBased.py @@ -1,11 +1,5 @@ -import pytest -import sys -from scipy.linalg import hankel - import numpy as np - -sys.path.append("..") - +from scipy.linalg import hankel from models.SubspaceBased import SingularSequenceTransformer