Skip to content

Commit

Permalink
Add docstring [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
phamquiluan authored Apr 30, 2024
1 parent 347a6d7 commit 71c54b7
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 15 deletions.
3 changes: 2 additions & 1 deletion baro/_bocpd.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# The code is shamelessly copied and developed from
# Acknowledgement: The code is copied and developed from
# https://github.com/hildensia/bayesian_changepoint_detection/tree/master/bayesian_changepoint_detection

from abc import ABC, abstractmethod

import numpy as np
Expand Down
39 changes: 38 additions & 1 deletion baro/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@
def nsigma(data, k=3, startsfrom=100):
"""For each time series (column) in the data,
detect anomalies using the n-sigma rule.
Return the timestamps of the anomalies.
Parameters:
- data : pandas DataFrame
The input data containing time series columns.
- k : int, optional
The number of standard deviations from the mean to consider as an anomaly. Default is 3.
- startsfrom : int, optional
The index from which to start calculating mean and standard deviation. Default is 100.
Returns:
- anomalies : list
List of timestamps where anomalies were detected.
"""
anomalies = []
for col in data.columns:
Expand All @@ -25,6 +36,22 @@ def nsigma(data, k=3, startsfrom=100):


def find_anomalies(data, time_col=None,threshold=0.01):
"""Find anomalies in the data based on a given threshold.
Parameters:
- data : list or numpy array
The input data to search for anomalies.
- time_col : pandas Series, optional
The timestamps corresponding to the data. Default is None.
- threshold : float, optional
The threshold value above which a data point is considered an anomaly. Default is 0.01.
Returns:
- merged_anomalies : list
List of merged timestamps where anomalies were detected.
- anomalies : list
List of timestamps where anomalies were detected.
"""
anomalies = []
for i in range(1, len(data)):
if data[i] > threshold:
Expand All @@ -48,6 +75,16 @@ def find_anomalies(data, time_col=None,threshold=0.01):


def bocpd(data):
"""Perform Multivariate Bayesian Online Change Point Detection (BOCPD) on the input data.
Parameters:
- data : pandas DataFrame
The input data containing metrics from microservices.
Returns:
- anomalies : list
List of timestamps where anomalies were detected.
"""
from functools import partial
from baro._bocpd import online_changepoint_detection, constant_hazard, MultivariateT
data = data.copy()
Expand Down
24 changes: 24 additions & 0 deletions baro/reproducibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@


def reproduce_baro(dataset=None, fault=None):
"""Reproduce BARO results for the given dataset and fault type.
Parameters:
- dataset : str, optional
The dataset to reproduce results for. Supported values are "fse-ob" (Online Boutique), "fse-ss" (Sock Shop), and "fse-tt" (Train Ticket).
- fault : str, optional
The type of fault to consider. Supported values are "cpu", "mem", "delay", "loss", or "all" (for all fault types). Default is None.
"""
assert dataset in ["fse-ob", "fse-ss", "fse-tt"], f"{dataset} is not supported!"
assert fault in [None, "all", "cpu", "mem", "delay", "loss"], f"{fault} is not supported!"
if fault is None:
Expand Down Expand Up @@ -90,6 +98,14 @@ def reproduce_baro(dataset=None, fault=None):


def reproduce_bocpd(dataset=None, saved=False):
"""Reproduce Multivariate BOCPD results for the given dataset.
Parameters:
- dataset : str, optional
The dataset to reproduce results for. Supported values are "fse-ob" (Online Boutique), "fse-ss" (Sock Shop), and "fse-tt" (Train Ticket).
- saved : bool, optional
If True, load precomputed results. If False, run BOCPD algorithm again. Default is False.
"""
assert dataset in ["fse-ob", "fse-ss", "fse-tt"], f"{dataset} is not supported!"

if not os.path.exists(f"data/{dataset}"):
Expand Down Expand Up @@ -172,6 +188,14 @@ def reproduce_bocpd(dataset=None, saved=False):


def reproduce_rq4(dataset=None, eval_metric=None):
"""Reproduce RQ4 results for the given dataset and evaluation metric.
Parameters:
- dataset : str, optional
The dataset to reproduce results for. Supported values are "fse-ob" (Online Boutique), "fse-ss" (Sock Shop), and "fse-tt" (Train Ticket).
- eval_metric : str, optional
The evaluation metric to use. Supported values are "top1", "top3", "avg5", or None (default, which equals to "avg5").
"""
assert dataset in ["fse-ob", "fse-ss", "fse-tt"], f"{dataset} is not supported!"
assert eval_metric in [None, "top1", "top3", "avg5"], f"{eval_metric} is not supported!"

Expand Down
88 changes: 88 additions & 0 deletions baro/root_cause_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@


def select_useful_cols(data):
"""Select useful columns from the dataset based on certain criteria.
Parameters:
- data : pandas.DataFrame
The dataset to select columns from.
Returns:
- selected_cols : list
A list of selected column names.
"""
selected_cols = []
for c in data.columns:
# keep time
Expand All @@ -28,6 +38,16 @@ def select_useful_cols(data):


def drop_extra(df: pd.DataFrame):
"""Drop extra columns from the DataFrame.
Parameters:
- df : pandas.DataFrame
The DataFrame to remove extra columns from.
Returns:
- df : pandas.DataFrame
The DataFrame after removing extra columns.
"""
if "time.1" in df:
df = df.drop(columns=["time.1"])

Expand All @@ -52,6 +72,16 @@ def drop_extra(df: pd.DataFrame):


def convert_mem_mb(df: pd.DataFrame):
"""Convert memory values in the DataFrame to MBs.
Parameters:
- df : pandas.DataFrame
The DataFrame containing memory values.
Returns:
- df : pandas.DataFrame
The DataFrame with memory values converted to MBs.
"""
# Convert memory to MBs
def update_mem(x):
if not x.name.endswith("_mem"):
Expand All @@ -64,6 +94,20 @@ def update_mem(x):


def preprocess(data, dataset=None, dk_select_useful=False):
"""Preprocess the dataset.
Parameters:
- data : pandas.DataFrame
The dataset to preprocess.
- dataset : str, optional
The dataset name. Default is None.
- dk_select_useful : bool, optional
Whether to select useful columns. Default is False.
Returns:
- data : pandas.DataFrame
The preprocessed dataset.
"""
data = drop_constant(drop_time(data))
data = convert_mem_mb(data)

Expand All @@ -78,6 +122,28 @@ def preprocess(data, dataset=None, dk_select_useful=False):


def nsigma(data, inject_time=None, dataset=None, num_loop=None, sli=None, anomalies=None, **kwargs):
"""Perform nsigma analysis on the dataset.
Parameters:
- data : pandas.DataFrame
The dataset to perform nsigma analysis on.
- inject_time : int, optional
The time of injection of anomalies. Default is None.
- dataset : str, optional
The dataset name. Default is None.
- num_loop : int, optional
Number of loops. Default is None.
- sli : int, optional
SLI (Service Level Indicator). Default is None.
- anomalies : list, optional
List of anomalies. Default is None.
- kwargs : dict
Additional keyword arguments.
Returns:
- dict
A dictionary containing node names and ranks.
"""
if anomalies is None:
normal_df = data[data["time"] < inject_time]
anomal_df = data[data["time"] >= inject_time]
Expand Down Expand Up @@ -122,6 +188,28 @@ def nsigma(data, inject_time=None, dataset=None, num_loop=None, sli=None, anomal
def robust_scorer(
data, inject_time=None, dataset=None, num_loop=None, sli=None, anomalies=None, **kwargs
):
"""Perform root cause analysis using RobustScorer.
Parameters:
- data : pandas.DataFrame
The datas to perform RobustScorer.
- inject_time : int, optional
The time of fault injection time. Default is None.
- dataset : str, optional
The dataset name. Default is None.
- num_loop : int, optional
Number of loops. Default is None. Just for future API compatible
- sli : int, optional
SLI (Service Level Indicator). Default is None. Just for future API compatible
- anomalies : list, optional
List of anomalies. Default is None.
- kwargs : dict
Additional keyword arguments.
Returns:
- dict
A dictionary containing node names and ranks. `ranks` is a ranked list of root causes.
"""
if anomalies is None:
normal_df = data[data["time"] < inject_time]
anomal_df = data[data["time"] >= inject_time]
Expand Down
25 changes: 12 additions & 13 deletions baro/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@


def download_online_boutique_dataset(local_path=None):
"""
Download the Online Boutique dataset from Zenodo.
"""
"""Download the Online Boutique dataset from Zenodo."""
if local_path == None:
local_path = "data"
if not os.path.exists(local_path):
Expand All @@ -29,9 +27,7 @@ def download_online_boutique_dataset(local_path=None):


def download_sock_shop_dataset(local_path=None):
"""
Download the Sock Shop dataset from Zenodo.
"""
"""Download the Sock Shop dataset from Zenodo."""
if local_path == None:
local_path = "data"
if not os.path.exists(local_path):
Expand All @@ -45,9 +41,7 @@ def download_sock_shop_dataset(local_path=None):


def download_train_ticket_dataset(local_path=None):
"""
Download the Train Ticket dataset from Zenodo.
"""
"""Download the Train Ticket dataset from Zenodo."""
if local_path == None:
local_path = "data"
if not os.path.exists(local_path):
Expand All @@ -61,20 +55,24 @@ def download_train_ticket_dataset(local_path=None):


def load_json(filename: str):
"""Load data from a JSON file."""
with open(filename) as f:
data = json.load(f)
return data


def drop_constant(df: pd.DataFrame):
"""Drop constant columns from the DataFrame."""
return df.loc[:, (df != df.iloc[0]).any()]


def drop_near_constant(df: pd.DataFrame, threshold: float = 0.1):
"""Drop columns with near-constant values from the DataFrame."""
return df.loc[:, (df != df.iloc[0]).mean() > threshold]


def drop_time(df: pd.DataFrame):
"""Drop time-related columns from the DataFrame."""
if "time" in df:
df = df.drop(columns=["time"])
if "Time" in df:
Expand All @@ -85,7 +83,7 @@ def drop_time(df: pd.DataFrame):


def visualize_metrics(data: pd.DataFrame, filename=None, figsize=None):
"""Visualize the metrics."""
"""Visualize metrics from the DataFrame."""
if figsize is None:
figsize = (25, 25)

Expand Down Expand Up @@ -128,7 +126,7 @@ def visualize_metrics(data: pd.DataFrame, filename=None, figsize=None):


def download_data(remote_url=None, local_path=None):
"""Download sample metrics data"""
"""Download data from a remote URL."""
if remote_url is None:
remote_url = "https://github.com/phamquiluan/baro/releases/download/0.0.4/simple_data.csv"
if local_path is None:
Expand Down Expand Up @@ -156,7 +154,7 @@ def download_data(remote_url=None, local_path=None):


def read_data(data_path, strip=True):
"""Read csv data for root cause analysis."""
"""Read CSV data for root cause analysis."""
data = pd.read_csv(data_path)
data_dir = os.path.dirname(data_path)

Expand Down Expand Up @@ -189,7 +187,7 @@ def read_data(data_path, strip=True):
return data

def to_service_ranks(ranks):
"""Convert fine-grained ranking to service ranks"""
"""Convert fine-grained ranking to service ranks."""
_service_ranks = [r.split("_")[0] for r in ranks]
service_ranks = []
# remove duplicates
Expand All @@ -206,6 +204,7 @@ def select_latency_and_error(data):
return data[["time"] + latency_cols + error_cols]

def find_cps(maxes):
"""Find change points given a `maxes` array."""
cps = []
for i in range(1, len(maxes)):
if abs(maxes[i] - maxes[i-1]) > 1:
Expand Down

0 comments on commit 71c54b7

Please sign in to comment.