diff --git a/baro/anomaly_detection.py b/baro/anomaly_detection.py index 58541a0..e704103 100644 --- a/baro/anomaly_detection.py +++ b/baro/anomaly_detection.py @@ -2,6 +2,7 @@ warnings.filterwarnings("ignore") import pandas import numpy as np +from baro.utility import drop_constant, find_cps def nsigma(data, k=3, startsfrom=100): """For each time series (column) in the data, @@ -50,12 +51,24 @@ def bocpd(data): from functools import partial from baro._bocpd import online_changepoint_detection, constant_hazard, MultivariateT data = data.copy() - time_col = data['time'] - data.drop(columns=['time'], inplace=True) + + # select latency and error metrics from microservices + selected_cols = [] + for c in data.columns: + if 'queue-master' in c or 'rabbitmq_' in c: continue + if "latency" in c or "latency-50" in c or "_error" in c: + selected_cols.append(c) + data = data[selected_cols] + + # handle na + data = drop_constant(data) + data = data.fillna(method="ffill") + data = data.fillna(0) for c in data.columns: data[c] = (data[c] - np.min(data[c])) / (np.max(data[c]) - np.min(data[c])) - data = data.ffill() + data = data.fillna(method="ffill") data = data.fillna(0) + data = data.to_numpy() R, maxes = online_changepoint_detection( @@ -63,15 +76,9 @@ def bocpd(data): partial(constant_hazard, 50), MultivariateT(dims=data.shape[1]) ) - Nw = 10 - anomalies, merged_anomalies = find_anomalies(data=R[Nw,Nw:-1].tolist(), time_col=time_col) - return anomalies - - -def anomaly_detector(data, method="nsigma"): - # assert data is dataframe - assert isinstance(data, pandas.DataFrame) - - # retain all `latency` and `error` columns - data = data[['latency', 'error']] + cps = find_cps(maxes) + anomalies = [p[0] for p in cps] + # anomalies, merged_anomalies = find_anomalies(data=R[Nw,Nw:-1].tolist(), time_col=time_col) + return anomalies + \ No newline at end of file