diff --git a/baro/_bocpd.py b/baro/_bocpd.py index a9f9907..e07b042 100644 --- a/baro/_bocpd.py +++ b/baro/_bocpd.py @@ -1,5 +1,6 @@ -# The code is shamelessly copied and developed from +# Acknowledgement: The code is copied and developed from # https://github.com/hildensia/bayesian_changepoint_detection/tree/master/bayesian_changepoint_detection + from abc import ABC, abstractmethod import numpy as np diff --git a/baro/anomaly_detection.py b/baro/anomaly_detection.py index e704103..e2c693e 100644 --- a/baro/anomaly_detection.py +++ b/baro/anomaly_detection.py @@ -7,7 +7,18 @@ def nsigma(data, k=3, startsfrom=100): """For each time series (column) in the data, detect anomalies using the n-sigma rule. - Return the timestamps of the anomalies. + + Parameters: + - data : pandas DataFrame + The input data containing time series columns. + - k : int, optional + The number of standard deviations from the mean to consider as an anomaly. Default is 3. + - startsfrom : int, optional + The index from which to start calculating mean and standard deviation. Default is 100. + + Returns: + - anomalies : list + List of timestamps where anomalies were detected. """ anomalies = [] for col in data.columns: @@ -25,6 +36,22 @@ def nsigma(data, k=3, startsfrom=100): def find_anomalies(data, time_col=None,threshold=0.01): + """Find anomalies in the data based on a given threshold. + + Parameters: + - data : list or numpy array + The input data to search for anomalies. + - time_col : pandas Series, optional + The timestamps corresponding to the data. Default is None. + - threshold : float, optional + The threshold value above which a data point is considered an anomaly. Default is 0.01. + + Returns: + - merged_anomalies : list + List of merged timestamps where anomalies were detected. + - anomalies : list + List of timestamps where anomalies were detected. + """ anomalies = [] for i in range(1, len(data)): if data[i] > threshold: @@ -48,6 +75,16 @@ def find_anomalies(data, time_col=None,threshold=0.01): def bocpd(data): + """Perform Multivariate Bayesian Online Change Point Detection (BOCPD) on the input data. + + Parameters: + - data : pandas DataFrame + The input data containing metrics from microservices. + + Returns: + - anomalies : list + List of timestamps where anomalies were detected. + """ from functools import partial from baro._bocpd import online_changepoint_detection, constant_hazard, MultivariateT data = data.copy() diff --git a/baro/reproducibility.py b/baro/reproducibility.py index 55ce4f8..6964b80 100644 --- a/baro/reproducibility.py +++ b/baro/reproducibility.py @@ -26,6 +26,14 @@ def reproduce_baro(dataset=None, fault=None): + """Reproduce BARO results for the given dataset and fault type. + + Parameters: + - dataset : str, optional + The dataset to reproduce results for. Supported values are "fse-ob" (Online Boutique), "fse-ss" (Sock Shop), and "fse-tt" (Train Ticket). + - fault : str, optional + The type of fault to consider. Supported values are "cpu", "mem", "delay", "loss", or "all" (for all fault types). Default is None. + """ assert dataset in ["fse-ob", "fse-ss", "fse-tt"], f"{dataset} is not supported!" assert fault in [None, "all", "cpu", "mem", "delay", "loss"], f"{fault} is not supported!" if fault is None: @@ -90,6 +98,14 @@ def reproduce_baro(dataset=None, fault=None): def reproduce_bocpd(dataset=None, saved=False): + """Reproduce Multivariate BOCPD results for the given dataset. + + Parameters: + - dataset : str, optional + The dataset to reproduce results for. Supported values are "fse-ob" (Online Boutique), "fse-ss" (Sock Shop), and "fse-tt" (Train Ticket). + - saved : bool, optional + If True, load precomputed results. If False, run BOCPD algorithm again. Default is False. + """ assert dataset in ["fse-ob", "fse-ss", "fse-tt"], f"{dataset} is not supported!" if not os.path.exists(f"data/{dataset}"): @@ -172,6 +188,14 @@ def reproduce_bocpd(dataset=None, saved=False): def reproduce_rq4(dataset=None, eval_metric=None): + """Reproduce RQ4 results for the given dataset and evaluation metric. + + Parameters: + - dataset : str, optional + The dataset to reproduce results for. Supported values are "fse-ob" (Online Boutique), "fse-ss" (Sock Shop), and "fse-tt" (Train Ticket). + - eval_metric : str, optional + The evaluation metric to use. Supported values are "top1", "top3", "avg5", or None (default, which equals to "avg5"). + """ assert dataset in ["fse-ob", "fse-ss", "fse-tt"], f"{dataset} is not supported!" assert eval_metric in [None, "top1", "top3", "avg5"], f"{eval_metric} is not supported!" diff --git a/baro/root_cause_analysis.py b/baro/root_cause_analysis.py index d83ce6b..070a363 100644 --- a/baro/root_cause_analysis.py +++ b/baro/root_cause_analysis.py @@ -5,6 +5,16 @@ def select_useful_cols(data): + """Select useful columns from the dataset based on certain criteria. + + Parameters: + - data : pandas.DataFrame + The dataset to select columns from. + + Returns: + - selected_cols : list + A list of selected column names. + """ selected_cols = [] for c in data.columns: # keep time @@ -28,6 +38,16 @@ def select_useful_cols(data): def drop_extra(df: pd.DataFrame): + """Drop extra columns from the DataFrame. + + Parameters: + - df : pandas.DataFrame + The DataFrame to remove extra columns from. + + Returns: + - df : pandas.DataFrame + The DataFrame after removing extra columns. + """ if "time.1" in df: df = df.drop(columns=["time.1"]) @@ -52,6 +72,16 @@ def drop_extra(df: pd.DataFrame): def convert_mem_mb(df: pd.DataFrame): + """Convert memory values in the DataFrame to MBs. + + Parameters: + - df : pandas.DataFrame + The DataFrame containing memory values. + + Returns: + - df : pandas.DataFrame + The DataFrame with memory values converted to MBs. + """ # Convert memory to MBs def update_mem(x): if not x.name.endswith("_mem"): @@ -64,6 +94,20 @@ def update_mem(x): def preprocess(data, dataset=None, dk_select_useful=False): + """Preprocess the dataset. + + Parameters: + - data : pandas.DataFrame + The dataset to preprocess. + - dataset : str, optional + The dataset name. Default is None. + - dk_select_useful : bool, optional + Whether to select useful columns. Default is False. + + Returns: + - data : pandas.DataFrame + The preprocessed dataset. + """ data = drop_constant(drop_time(data)) data = convert_mem_mb(data) @@ -78,6 +122,28 @@ def preprocess(data, dataset=None, dk_select_useful=False): def nsigma(data, inject_time=None, dataset=None, num_loop=None, sli=None, anomalies=None, **kwargs): + """Perform nsigma analysis on the dataset. + + Parameters: + - data : pandas.DataFrame + The dataset to perform nsigma analysis on. + - inject_time : int, optional + The time of injection of anomalies. Default is None. + - dataset : str, optional + The dataset name. Default is None. + - num_loop : int, optional + Number of loops. Default is None. + - sli : int, optional + SLI (Service Level Indicator). Default is None. + - anomalies : list, optional + List of anomalies. Default is None. + - kwargs : dict + Additional keyword arguments. + + Returns: + - dict + A dictionary containing node names and ranks. + """ if anomalies is None: normal_df = data[data["time"] < inject_time] anomal_df = data[data["time"] >= inject_time] @@ -122,6 +188,28 @@ def nsigma(data, inject_time=None, dataset=None, num_loop=None, sli=None, anomal def robust_scorer( data, inject_time=None, dataset=None, num_loop=None, sli=None, anomalies=None, **kwargs ): + """Perform root cause analysis using RobustScorer. + + Parameters: + - data : pandas.DataFrame + The datas to perform RobustScorer. + - inject_time : int, optional + The time of fault injection time. Default is None. + - dataset : str, optional + The dataset name. Default is None. + - num_loop : int, optional + Number of loops. Default is None. Just for future API compatible + - sli : int, optional + SLI (Service Level Indicator). Default is None. Just for future API compatible + - anomalies : list, optional + List of anomalies. Default is None. + - kwargs : dict + Additional keyword arguments. + + Returns: + - dict + A dictionary containing node names and ranks. `ranks` is a ranked list of root causes. + """ if anomalies is None: normal_df = data[data["time"] < inject_time] anomal_df = data[data["time"] >= inject_time] diff --git a/baro/utility.py b/baro/utility.py index 296de28..9bf5e9a 100644 --- a/baro/utility.py +++ b/baro/utility.py @@ -13,9 +13,7 @@ def download_online_boutique_dataset(local_path=None): - """ - Download the Online Boutique dataset from Zenodo. - """ + """Download the Online Boutique dataset from Zenodo.""" if local_path == None: local_path = "data" if not os.path.exists(local_path): @@ -29,9 +27,7 @@ def download_online_boutique_dataset(local_path=None): def download_sock_shop_dataset(local_path=None): - """ - Download the Sock Shop dataset from Zenodo. - """ + """Download the Sock Shop dataset from Zenodo.""" if local_path == None: local_path = "data" if not os.path.exists(local_path): @@ -45,9 +41,7 @@ def download_sock_shop_dataset(local_path=None): def download_train_ticket_dataset(local_path=None): - """ - Download the Train Ticket dataset from Zenodo. - """ + """Download the Train Ticket dataset from Zenodo.""" if local_path == None: local_path = "data" if not os.path.exists(local_path): @@ -61,20 +55,24 @@ def download_train_ticket_dataset(local_path=None): def load_json(filename: str): + """Load data from a JSON file.""" with open(filename) as f: data = json.load(f) return data def drop_constant(df: pd.DataFrame): + """Drop constant columns from the DataFrame.""" return df.loc[:, (df != df.iloc[0]).any()] def drop_near_constant(df: pd.DataFrame, threshold: float = 0.1): + """Drop columns with near-constant values from the DataFrame.""" return df.loc[:, (df != df.iloc[0]).mean() > threshold] def drop_time(df: pd.DataFrame): + """Drop time-related columns from the DataFrame.""" if "time" in df: df = df.drop(columns=["time"]) if "Time" in df: @@ -85,7 +83,7 @@ def drop_time(df: pd.DataFrame): def visualize_metrics(data: pd.DataFrame, filename=None, figsize=None): - """Visualize the metrics.""" + """Visualize metrics from the DataFrame.""" if figsize is None: figsize = (25, 25) @@ -128,7 +126,7 @@ def visualize_metrics(data: pd.DataFrame, filename=None, figsize=None): def download_data(remote_url=None, local_path=None): - """Download sample metrics data""" + """Download data from a remote URL.""" if remote_url is None: remote_url = "https://github.com/phamquiluan/baro/releases/download/0.0.4/simple_data.csv" if local_path is None: @@ -156,7 +154,7 @@ def download_data(remote_url=None, local_path=None): def read_data(data_path, strip=True): - """Read csv data for root cause analysis.""" + """Read CSV data for root cause analysis.""" data = pd.read_csv(data_path) data_dir = os.path.dirname(data_path) @@ -189,7 +187,7 @@ def read_data(data_path, strip=True): return data def to_service_ranks(ranks): - """Convert fine-grained ranking to service ranks""" + """Convert fine-grained ranking to service ranks.""" _service_ranks = [r.split("_")[0] for r in ranks] service_ranks = [] # remove duplicates @@ -206,6 +204,7 @@ def select_latency_and_error(data): return data[["time"] + latency_cols + error_cols] def find_cps(maxes): + """Find change points given a `maxes` array.""" cps = [] for i in range(1, len(maxes)): if abs(maxes[i] - maxes[i-1]) > 1: