diff --git a/README.md b/README.md index 96dee3f..1e2d402 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,112 @@ -# multi-dim-baselines -Baselines for multi-dimensional RCA +# RiskLoc +Code for the paper RiskLoc: Localization of Multi-dimensional Root Causes by Weighted Risk ([link](https://arxiv.org/abs/2205.10004)). +Contains the implementation of RiskLoc and all baseline multi-dimensional root cause localization methods. + +## Requirements +- pandas +- numpy +- scipy +- kneed (for squeeze) +- loguru (for squeeze) + +## How to run + +To run, use the `run.py` file. There are a couple of options, either to use a single file or to run all files in a directory (including all subdirectories). + +Example of running a single file using riskloc in debug mode: +``` +python run.py riskloc --run-path /data/B0/B_cuboid_layer_1_n_ele_1/1450653900.csv --debug +``` + +Example of running all files in a particular setting for a dataset (setting derived to True): +``` +python run.py riskloc --run-path /data/D/B_cuboid_layer_3_n_ele_3 --derived +``` + +Example of running all files in a dataset: +``` +python run.py riskloc --run-path /data/B0 +``` + +Example of running all datasets with 20 threads: +``` +python run.py riskloc --n-threads 20 +``` + +Changing `riskloc` to any of the supported algorithms will run those instead, see below. + +## Algorithms +The supported algorithms are: +``` +$ python run.py --help +usage: run.py [-h] {riskloc,autoroot,squeeze,old squeeze,hotspot,r_adtributor,adtributor} ... + +RiskLoc + +positional arguments: {riskloc,autoroot,squeeze,old squeeze,hotspot,r_adtributor,adtributor} + + algorithm specific help + riskloc riskloc help + autoroot autoroot help + squeeze squeeze help + hotspot autoroot help + r_adtributor r_adtributor help + adtributor adtributor help + +optional arguments: + -h, --help show this help message and exit +``` +The code for Squeeze is adapted from the recently released code from the original publication: https://github.com/NetManAIOps/Squeeze. + +To see the algorithm specific arguments run: `python run.py 'algorithm' --help`. For example, for RiskLoc: +``` +$ python run.py riskloc --help +usage: run.py riskloc [-h] [--data-root DATA_ROOT] [--run-path RUN_PATH] [--derived [DERIVED]] [--n-threads N_THREADS] [--output-suffix OUTPUT_SUFFIX] [--debug [DEBUG]] [--risk-threshold RISK_THRESHOLD] [--ep-prop-threshold EP_PROP_THRESHOLD] + +optional arguments: + -h, --help show this help message and exit + --data-root DATA_ROOT root directory for all datasets (default ./data/) + --run-path RUN_PATH directory or file to be run; + if a directory, any subdirectories will be considered as well; + must contain data-path as a prefix + --derived [DERIVED] derived dataset (defaults to True for the D dataset and False for others) + --n-threads N_THREADS number of threads to run + --output-suffix OUTPUT_SUFFIX suffix for output file + --debug [DEBUG] debug mode + --risk-threshold RISK_THRESHOLD risk threshold + --pep-threshold PEP_THRESHOLD proportional explanatory power threshold + --prune-elements [PRUNE_ELEMENTS] use element pruning (True/False) +``` + +The `risk-threshold` and `pep-threshold` arguments are specific for the RiskLoc while the rest are shared by all algorithms. To see the algorithm specific arguments for other algorithms simply run them with the `--help` flag or check the code in `run.py`. + +## Datasets +The semi-synthetic datasets can be downloaded from: https://github.com/NetManAIOps/Squeeze. +To run these, place them within the data/ directory and name them: A, B0, B1, B2, B3, B4, and D, respectively. + +The three synthetic datasets used in the paper can be generated using `generate_dataset.py` as follows. + +S dataset: +``` +python generate_dataset.py --num 1000 --dataset-name S --seed 121 +``` +L dataset: +``` +python generate_dataset.py --num 1000 --dataset-name L --seed 122 --dims 10 24 10 15 --noise-level 0.0 0.1 --anomaly-severity 0.5 1.0 --anomaly-deviation 0.0 0.0 --num-anomaly 1 5 --num-anomaly-elements 1 1 --only-last-layer +``` +H dataset: +``` +python generate_dataset.py --num 100 --dataset-name H --seed 123 --dims 10 5 250 20 8 12 +``` + +In addition, new, intersting datasets can be created for using `generate_dataset.py` for extended emperical verification and research purposes. Supported input argments can be found at the beginning of the `generate_dataset.py` file or using the `--help` flag. + +## Citation +``` +@article{riskloc, + title={RiskLoc: Localization of Multi-dimensional Root Causes by Weighted Risk}, + author={Kalander, Marcus}, + journal={arXiv preprint arXiv:2205.10004}, + year={2022} +} +``` diff --git a/algorithms/adtributor.py b/algorithms/adtributor.py new file mode 100644 index 0000000..b173c28 --- /dev/null +++ b/algorithms/adtributor.py @@ -0,0 +1,43 @@ +import numpy as np +import pandas as pd +from utils.element_scores import add_explanatory_power, add_surpise + + +def merge_dimensions(df, dimensions, derived): + elements = pd.DataFrame(columns=list(set(df.columns) - set(dimensions)), dtype=float) + for d in dimensions: + dim = df.groupby(d).sum().reset_index() + dim['element'] = dim[d] + dim['dimension'] = d + dim = dim.drop(columns=d) + elements = pd.concat([elements, dim], axis=0, sort=False) + + if derived: + elements['predict'] = elements['predict_a'] / elements['predict_b'] + elements['real'] = elements['real_a'] / elements['real_b'] + + elements = elements.reset_index(drop=True) + return elements + + +def adtributor(df, dimensions, teep=0.1, tep=0.1, k=3, derived=False): + elements = merge_dimensions(df, dimensions, derived) + elements = add_explanatory_power(elements, derived) + elements = add_surpise(elements, derived, merged_divide=len(dimensions)) + + candidate_set = [] + for d in dimensions: + dim_elems = elements.loc[elements['dimension'] == d].set_index('element') + dim_elems = dim_elems.sort_values('surprise', ascending=False) + cumulative_ep = dim_elems.loc[dim_elems['ep'] > teep, 'ep'].cumsum() + if np.any(cumulative_ep > tep): + idx = (cumulative_ep > tep).idxmax() + candidate = {'elements': cumulative_ep[:idx].index.values.tolist(), + 'explanatory_power': cumulative_ep[idx], + 'surprise': dim_elems.loc[:idx, 'surprise'].sum(), + 'dimension': d} + candidate_set.append(candidate) + + # Sort by surprise and return the top k + candidate_set = sorted(candidate_set, key=lambda t: t['surprise'], reverse=True)[:k] + return candidate_set diff --git a/algorithms/autoroot.py b/algorithms/autoroot.py new file mode 100644 index 0000000..8ddda1b --- /dev/null +++ b/algorithms/autoroot.py @@ -0,0 +1,170 @@ +import numpy as np +import pandas as pd +from itertools import combinations +from utils.element_scores import add_deviation_score +from scipy.stats import gaussian_kde +from scipy.signal import argrelextrema + + +def get_unique_elements(df, cuboid): + return np.vstack(list({tuple(row) for row in df[cuboid].values})) + + +def get_elements_mask(df, cuboid, elements): + return np.logical_and.reduce(np.logical_or.reduce([(df[cuboid] == e).values for e in elements], axis=0), axis=1) + + +def nps(selection, non_selection): + sel_real, sel_pred = selection['real'], selection['predict'] + non_sel_real, non_sel_pred = non_selection['real'], non_selection['predict'] + + with np.errstate(divide='ignore', invalid='ignore'): + selection_a = np.nan_to_num(sel_pred * (sel_real.sum() / sel_pred.sum())) + + a = np.mean(np.nan_to_num(np.abs(sel_real - selection_a) / sel_real, posinf=0, neginf=0, nan=0)) + b = np.mean(np.nan_to_num(np.abs(sel_real - sel_pred) / sel_real, posinf=0, neginf=0, nan=0)) + c = np.mean(np.nan_to_num(np.abs(non_sel_real - non_sel_pred) / non_sel_real, posinf=0, neginf=0, nan=0)) + return 1 - ((a + c) / (b + c)) + + +def kde_clustering(df): + values = df['deviation'].values + + if len(np.unique(values)) == 1: + df['cluster'] = 1 + return df + + kernel = gaussian_kde(values, bw_method='silverman') + + s = np.linspace(-2, 2, 400) + e = kernel.evaluate(s) + mi = argrelextrema(e, np.less)[0] + + # All ends in reverse order + ends = sorted(np.concatenate((s[mi], [np.inf])), reverse=True) + for i, end in enumerate(ends): + df.loc[df['deviation'] <= end, 'cluster'] = i + return df + + +def is_subset(parent, child): + return all([any([p.issubset(c) for p in parent]) for c in child]) + + +def remove_crc(cluster_root_causes, elem_to_remove): + def filter_crc(crc): + root_cause_set = set([frozenset(elem) for elem in crc['elements']]) + return root_cause_set == elem_to_remove + + return [crc for crc in cluster_root_causes if not filter_crc(crc)] + + +def remove_same_layer(cluster_root_causes): + # Merge if exactly the same root cause + duplicates = [] + for p, c in combinations(enumerate(cluster_root_causes), 2): + if p[1]['layer'] == c[1]['layer']: + parent_set = set([frozenset(elems) for elems in p[1]['elements']]) + child_set = set([frozenset(elems) for elems in c[1]['elements']]) + if is_subset(parent_set, child_set): + duplicates.append(p[0]) + mask = np.full(len(cluster_root_causes), True, dtype=bool) + mask[duplicates] = False + cluster_root_causes = np.array(cluster_root_causes)[mask].tolist() + return cluster_root_causes + + +def merge_root_causes(cluster_root_causes, max_layer=4): + cluster_root_causes = remove_same_layer(cluster_root_causes) + + for layer in range(max_layer - 1, 0, -1): + layer_root_causes = [set([frozenset(elems) for elems in crc['elements']]) for crc in cluster_root_causes if + crc['layer'] == layer] + higher_layer_root_causes = [set([frozenset(elems) for elems in crc['elements']]) for crc in cluster_root_causes + if crc['layer'] > layer] + + for child in higher_layer_root_causes: + for parent in layer_root_causes: + if is_subset(parent, child): + print('parent', parent, 'child', child) + cluster_root_causes = remove_crc(cluster_root_causes, child) + return cluster_root_causes + + +def search_cluster(df, df_cluster, attributes, delta_threshold, debug=False): + z = len(df_cluster) + + best_root_cause = {'avg': -1.0} + for layer in range(1, len(attributes) + 1): + if debug: print('Layer:', layer) + cuboids = [list(c) for c in combinations(attributes, layer)] + for cuboid in cuboids: + if debug: print('Cuboid:', cuboid) + + # Way too many to go through. This is probably not what is done. + # elements = get_unique_elements(df_cluster, cuboid) + # splits = [t for r in range(1, len(elements) + 1) for t in list(combinations(elements, r))] + + # if last layer, we only run if CF can be above the threshold + best_candidate = {'NPS': -1.0} + if layer == len(attributes): + CF = 1 / len(df_cluster) + if CF <= delta_threshold: + continue + + xs = df_cluster.groupby(cuboid)['real'].count() + xs = xs.loc[(xs / z) > delta_threshold] + xs.name = 'x' + + ys = df.groupby(cuboid)['real'].count() + ys.name = 'y' + splits = pd.concat([xs, ys], axis=1, join='inner') + splits['LF'] = splits['x'] / splits['y'] + splits = splits.loc[splits['LF'] > delta_threshold] + + for s, row in splits.iterrows(): + split = [s] if layer == 1 else s + mask = get_elements_mask(df, cuboid, split) + + selection = df.loc[mask] + non_selection = df.loc[~mask] + nps_score = nps(selection, non_selection) + if nps_score > best_candidate['NPS']: + CF = row['x'] / z + avg_score = (nps_score + row['LF'] + CF) / 3 + candidate = {'elements': [split], 'layer': layer, 'cuboid': cuboid, + 'LF': row['LF'], 'CF': CF, 'NPS': nps_score, 'avg': avg_score} + best_candidate = candidate.copy() + + if 'elements' in best_candidate and best_candidate['avg'] > best_root_cause['avg']: + best_root_cause = best_candidate.copy() + + if 'elements' not in best_root_cause: + return None + return best_root_cause + + +def autoroot(df, attributes, delta_threshold=0.1, debug=False): + df = add_deviation_score(df) + + # Filter away the uninteresting elements with a score [-0.2,0.2]. + # (The deviation score here uses a multiple 2.) + df_relevant = df.loc[df['deviation'].abs() > 0.2].copy() + + df_relevant = kde_clustering(df_relevant) + clusters = df_relevant['cluster'].unique() + if debug: print('clusters:', clusters) + + cluster_root_causes = [] + for cluster in clusters: + if debug: print("Cluster:", cluster) + df_cluster = df_relevant.loc[df_relevant['cluster'] == cluster].copy() + + root_cause = search_cluster(df, df_cluster, attributes, delta_threshold, debug) + if root_cause is not None: + root_cause['cluster'] = cluster + cluster_root_causes.append(root_cause) + + if debug: print('root causes before merge:', cluster_root_causes) + cluster_root_causes = merge_root_causes(cluster_root_causes, max_layer=len(attributes)) + return cluster_root_causes diff --git a/algorithms/hotspot.py b/algorithms/hotspot.py new file mode 100644 index 0000000..4b9a194 --- /dev/null +++ b/algorithms/hotspot.py @@ -0,0 +1,252 @@ +import math +import random +import numpy as np +from itertools import combinations +from copy import deepcopy + + +class Node: + def __init__(self): + self.parent = None + self.state = [] + self.children = [] + self.fully_expanded = False + self.Q = 0 + self.N = 0 + + def __str__(self): + return f"node state: {self.state}, Q: {self.Q}, N: {self.N}, fully expanded: {self.fully_expanded}" + + +def ripple(v_sum, f_sum, f_leaves): + return f_leaves - (f_sum - v_sum) * (f_leaves / f_sum) if f_sum != 0 else 0 + + +def distance(v1, v2): + return np.sqrt(np.sum(np.power(v1 - v2, 2))) + + +def ps(df, v, f, selections): + a = np.copy(f) + for selection in selections: + v_sum = df.loc[selection, 'real'].sum() + f_sum = df.loc[selection, 'predict'].sum() + a[selection] = ripple(v_sum, f_sum, df.loc[selection, 'predict']) + + score = max(1 - distance(v, a) / distance(v, f), 0) + return score + + +def gps(v, f, selections): + a, b = [], [] + for selection in selections: + selection_v = v[selection] + selection_f = f[selection] + with np.errstate(divide='ignore', invalid='ignore'): + selection_a = f[selection] * (selection_v.sum() / selection_f.sum()) + selection_a = np.nan_to_num(selection_a) + a.extend(np.abs(selection_v - selection_a)) + b.extend(np.abs(selection_v - selection_f)) + + selection = np.logical_or.reduce(selections) + non_selection_v = v[~selection] + non_selection_f = f[~selection] + + a = np.mean(a) + b = np.mean(b) + c = np.nan_to_num(np.mean(np.abs(non_selection_v - non_selection_f))) + score = 1 - ((a + c) / (b + c)) + return score + + +def get_unqiue_elements(df, cuboid): + return {tuple(row) for row in df[cuboid].values} + + +def get_element_mask(df, cuboid, combination): + return [np.logical_and.reduce([df[d] == e for d, e in zip(cuboid, c)]) for c in combination] + + +def ucb(node, C=math.sqrt(2.0)): + best_child = None + max_score = -1 + for child in node.children: + if child.N > 0 and not child.fully_expanded: + left = child.Q + right = C * math.sqrt(math.log(node.N) / child.N) + score = left + right + if score > max_score: + best_child = child + max_score = score + return best_child + + +def init_children(node, elements): + children = [e for e in elements if e not in set(node.state)] + for c in children: + child = Node() + child.state = node.state + [c] + child.parent = node + node.children.append(child) + + +def get_initial_scores(df, elements, cuboid, v, f, scoring): + element_scores = dict() + for leaf in elements: + selections = get_element_mask(df, cuboid, [leaf]) + if scoring == 'ps': + element_scores[leaf] = ps(df.copy(), v, f, selections) + else: + element_scores[leaf] = gps(v, f, selections) + return element_scores + + +def sublist(lst1, lst2): + return set(lst1) <= set(lst2) + + +def selection(node, elements): + while len(node.state) < len(elements): + if len(node.children) == 0: # First time to search this node. + init_children(node, elements) + return node + + q_max = 0 + all_visit = True + for child in node.children: + q_max = max(q_max, child.Q) + if child.N == 0: # Not all children have been visited. + all_visit = False + + if not all_visit and random.random() > q_max: + return node # Expand current node + + child_node = ucb(node) # Select the best path got go deeper into the tree. + if child_node is None: # If all children are already fully expanded. + if all_visit: + node.fully_expanded = True + if node.parent is None: + return node # The tree is fully exanded. + node = node.parent # Continue again with parent node. + else: + return node # Expand current node. + else: + node = child_node + + node.fully_expanded = True + return node + + +def expand(node, element_scores): + best_child = None + max_score = -1 + for child in node.children: + if child.N == 0: + score = element_scores[child.state[-1]] + if score > max_score: + max_score = score + best_child = child + return best_child + + +def evaluate(df, selected_node, cuboid, v, f, scoring): + selections = get_element_mask(df, cuboid, selected_node.state) + if scoring == 'ps': + score = ps(df.copy(), v, f, selections) + else: + score = gps(v, f, selections) + return score + + +def backup(node, new_q): + while node is not None: + node.N += 1 + node.Q = max(node.Q, new_q) + node = node.parent + + +def MCTS(df, elements, cuboid, v, f, pt, m, scoring): + root = Node() + max_q = -1 + best_selection = Node() + + element_scores = get_initial_scores(df, elements, cuboid, v, f, scoring) + for i in range(m): + node = selection(root, elements) + if not node.fully_expanded: + node = expand(node, element_scores) + + if root.fully_expanded: + break + + new_q = evaluate(df, node, cuboid, v, f, scoring) + backup(node, new_q) + + if new_q > max_q: + max_q = root.Q + best_selection = deepcopy(node) + elif (new_q == max_q) and not sublist(node.state, best_selection.state) and len(node.state) < len( + best_selection.state): + max_q = root.Q + best_selection = deepcopy(node) + + if max_q >= pt: + break + + return best_selection.state, max_q + + +def hierarchical_pruning(elements, layer, cuboid, candidate_set): + previous_layer_candidates = [candidate for candidate in candidate_set if candidate['layer'] == layer - 1] + parent_selections = [cand['elements'] for cand in previous_layer_candidates if set(cand['cuboid']) < set(cuboid)] + + for parent_selection in parent_selections: + elements = [e for e in elements if np.any([set(pe) < set(e) for pe in parent_selection])] + return elements + + +def get_best_candidate(candidate_set): + # Sort by score, layer, number of elements + sorted_cands = sorted(candidate_set, key=lambda c: (c['score'], -c['layer'], -len(c['elements'])), reverse=True) + return sorted_cands[0] + + +def hotspot(df, dimensions, pt=0.67, m=200, scoring='gps', debug=False): + assert scoring in ['ps', 'gps'], "Supported scoring is 'ps' and 'gps'." + + # Hierarcical pruning does not seem to work well when using gps scoring + use_pruning = scoring != 'gps' + + v = df['real'].values + f = df['predict'].values + + candidate_set = [] + for layer in range(1, len(dimensions) + 1): + if debug: print('Layer:', layer) + cuboids = [list(c) for c in combinations(dimensions, layer)] + for cuboid in cuboids: + if debug: print('Cuboid:', cuboid) + + elements = get_unqiue_elements(df, cuboid) + # if debug: print('Elements:', elements) + + if use_pruning and layer > 1: + elements = hierarchical_pruning(elements, layer, cuboid, candidate_set) + # if debug: print('Filtered elements:', elements) + + selected_set, score = MCTS(df, elements, cuboid, v, f, pt, m, scoring) + if debug: print('Best subset:', selected_set, 'score', score) + + candidate = { + 'layer': layer, + 'cuboid': cuboid, + 'score': score, + 'elements': np.array(selected_set) + } + + if candidate['score'] >= pt: + return candidate + + candidate_set.append(candidate) + + return get_best_candidate(candidate_set) diff --git a/algorithms/rev_rec_adtributor.py b/algorithms/rev_rec_adtributor.py new file mode 100644 index 0000000..fabb7e2 --- /dev/null +++ b/algorithms/rev_rec_adtributor.py @@ -0,0 +1,74 @@ +import numpy as np +from utils.element_scores import add_explanatory_power, add_surpise +from algorithms.adtributor import merge_dimensions + + +def remove_duplicates(explanatory_set): + seen = dict() + for cs in explanatory_set: + key = ''.join(np.array(cs['elements']).flatten()) + if key not in seen: + seen[key] = cs + return list(seen.values()) + + +def rev_adtributor(df, dimensions, teep=0.1, k=3, derived=False): + elements = merge_dimensions(df, dimensions, derived) + elements = add_explanatory_power(elements, derived) + elements = add_surpise(elements, derived, merged_divide=len(dimensions)) + + explainatory_set = [] + for d in dimensions: + partition_set = elements.loc[elements['dimension'] == d].set_index('element') + candidate_set = partition_set.loc[partition_set['ep'] > teep] + + if 0 < len(candidate_set) < len(partition_set): + candidate = {'elements': candidate_set.index.values.tolist(), + 'explanatory_power': candidate_set['ep'].sum(), + 'surprise': candidate_set['surprise'].sum(), + 'dimension': d} + explainatory_set.append(candidate) + + # Sort by surprise and return the top k + explainatory_set = sorted(explainatory_set, key=lambda t: t['surprise'], reverse=True)[:k] + return explainatory_set + + +def rev_rec_adtributor(df, dimensions, teep=0.1, k=3, derived=False): + explanatory_set = rev_adtributor(df, dimensions, teep, k, derived) + + new_explanatory_set = [] + for candidate_set in explanatory_set: + candidate_set['elements'] = [[e] for e in candidate_set['elements']] + candidate_set['cuboid'] = [candidate_set['dimension']] + + # Only search dimensions 'after' the current dimension to avoid repeating (ordered differently) elements. + remaining_dims = list(set(dimensions) - set(candidate_set['dimension'])) + + # Keep the old candidate set if no deeper explainatory set is found within a subcube. + new_candidate_set = [] + if len(remaining_dims) > 0: + for candidate in candidate_set['elements']: + df_candidate = df[df[candidate_set['dimension']] == candidate[0]].copy() + c_explanatory_set = rev_rec_adtributor(df_candidate, remaining_dims, teep, k, derived) + + if len(c_explanatory_set) == 0: + # if one of the candidates do not have any explainatory_set then + # we use the candidate set of this layer + new_candidate_set = [] + break + + for es in c_explanatory_set: + es['elements'] = [sorted(e + candidate) for e in es['elements']] + es['explanatory_power'] = es['explanatory_power'] * candidate_set['explanatory_power'] + es['cuboid'] = sorted(candidate_set['cuboid'] + es['cuboid']) + new_candidate_set.extend(c_explanatory_set) + + if len(new_candidate_set) > 0: + new_explanatory_set.extend(new_candidate_set) + else: + new_explanatory_set.append(candidate_set) + + # Remove any duplicates + new_explanatory_set = remove_duplicates(new_explanatory_set) + return new_explanatory_set diff --git a/algorithms/riskloc.py b/algorithms/riskloc.py new file mode 100644 index 0000000..fd7ca63 --- /dev/null +++ b/algorithms/riskloc.py @@ -0,0 +1,213 @@ +import math +import numpy as np +from itertools import combinations +from collections import defaultdict +from utils.element_scores import add_explanatory_power, add_deviation_score + + +def get_element_mask(df, cuboid, element): + return np.logical_and.reduce((df[cuboid] == element).values, axis=1) + + +def add_weight(df, cutoff): + df['weight'] = (cutoff - df['deviation']).abs() + df.loc[(df['real'] == 0) & (df['predict'] == 0), 'weight'] = 0 + df['weight'] = np.where(df['partition'] == 1, df['deviation'].abs(), df['weight']) + df.loc[df['weight'] > 1, 'weight'] = 1 # max weight is 1 + return df + + +def add_partition(df, cutoff): + if cutoff == 0: + # We need to find the correct direction by checking the sign + anomaly_right = math.copysign(1, cutoff) > 0 + else: + anomaly_right = cutoff >= 0 + + df['partition'] = 0 + if anomaly_right: + df.loc[df['deviation'] > cutoff, 'partition'] = 1 + else: + df.loc[df['deviation'] < cutoff, 'partition'] = 1 + return df + + +def get_cutoff(df, n_remove=5): + devs = df['deviation'].drop_duplicates() + + # TODO: Ablation test + # n_remove = 0 + + min_val = devs.nsmallest(n_remove + 1).max() + max_val = devs.nlargest(n_remove + 1).min() + t = -min(min_val, max_val, key=abs) + return t + + +def high_risk(selection): + n_anomaly = selection.loc[selection['partition'] == 1, 'weight'].sum() + n_normal = selection.loc[selection['partition'] == 0, 'weight'].sum() + 1 + high_risk_score = n_anomaly / (n_anomaly + n_normal) + return high_risk_score + + +def low_risk(selection): + # We can only do relative comparision when the real value != 0. + # Predict = 0 will give a = 0, so not relevant for our d / selection['dev'] comparison. + selection = selection.loc[(selection['real'] != 0) & (selection['predict'] != 0)] + + low_risk_score = 0.0 + if len(selection) > 0: + a = selection['predict'] * selection['real'].sum() / selection['predict'].sum() + d = (2 * (a - selection['real']) / (a + selection['real'])).fillna(0.0) + + w1 = d.abs().mean() + w2 = selection['deviation'].abs().mean() + + if w2 != 0.0: + low_risk_score = w1 / w2 + return low_risk_score + + +def element_pruning(df, cuboid, pruned_elements): + if pruned_elements is None: + return df, ['ep', 'partition'] + + df_c = df.copy() + keys = [key for key in pruned_elements.keys() if set(cuboid).issuperset(set(key))] + for key in keys: + ics = pruned_elements[key] + # print('ignored cuboids:', ics) + if len(ics) > 0: + df_c = df_c.loc[~df_c.set_index(list(key)).index.isin(ics)] + return df_c, ['ep', 'ep_z', 'partition'] + + +# TODO: TEST (org 3) +def add_prune_element(eps, adj_ep_threshold, layer, cuboid, pruned_elements, max_layer=1): + if layer <= max_layer: + ics = eps.loc[(eps['ep_z'] < adj_ep_threshold) | (eps['partition'] == 0)] + if len(ics) > 0: + ics = ics.index.tolist() + # print('added ignored cuboids:', ics) + pruned_elements[tuple(cuboid)].extend(ics) + return pruned_elements + + +def search_anomaly(df, attributes, pruned_elements, risk_threshold=0.5, adj_ep_threshold=0.0, debug=True): + for layer in range(1, len(attributes) + 1): + if debug: print('Layer:', layer) + cuboids = [list(c) for c in combinations(attributes, layer)] + + best_root_cause = {'ep_score': adj_ep_threshold} + for cuboid in cuboids: + if debug: print('Cuboid:', cuboid) + + # Prune irrelevant cuboids if needed + df_c, sum_cols = element_pruning(df, cuboid, pruned_elements) + + # Compute the cuboid element values + eps = df_c.groupby(cuboid)[sum_cols].sum() + + # Check for cuboid pruning + # need to use ep_z here instead of ep to make sure no potential root causes are missed. + if pruned_elements is not None: + pruned_elements = add_prune_element(eps, adj_ep_threshold, layer, cuboid, pruned_elements) + + # Filter away elements with too low EP scores. + # Make sure at least 1 leaf element is in the anomaly partition (1). + eps = eps.loc[(eps['partition'] > 0) & (eps['ep'] > best_root_cause['ep_score'])] + eps = eps['ep'].sort_values(ascending=False) + + for e, ep_score in eps.iteritems(): + element = (e,) if layer == 1 else e + + mask = get_element_mask(df_c, cuboid, element) + selection = df_c[mask] + + high_risk_score = high_risk(selection) + low_risk_score = low_risk(selection) + + # TODO: Ablation test + # high_risk_score = 1.0 + + # TODO: Ablation test + # low_risk_score = 0.0 + + risk_score = high_risk_score - low_risk_score + + if debug: print('element', element, 'ep score', ep_score, 'high', high_risk_score, + 'low', low_risk_score, 'risk', risk_score) + + if risk_score >= risk_threshold: + if debug: print('New best score') + + best_root_cause = { + 'elements': [element], + 'high risk score': high_risk_score, + 'low risk score': low_risk_score, + 'risk score': risk_score, + 'ep_score': ep_score, + 'layer': layer, + 'cuboid': cuboid, + } + + # We know the eps scores are in order (i.e., this is the one with highest ep in this cuboid) + # so we continue with the next cuboid in this layer. + break + + # If an element has been found + if 'elements' in best_root_cause: + return best_root_cause, pruned_elements + return None, pruned_elements + + +def riskloc(df, attributes, risk_threshold=0.5, pep_threshold=0.02, derived=False, prune_elements=True, + debug=False): + df = add_explanatory_power(df, False) # TODO: TESTING + df = add_deviation_score(df) + + cutoff = get_cutoff(df) + if debug: print('cutoff:', cutoff) + + # TODO: Ablation test + # cutoff = -0.2 if cutoff < 0 else 0.2 + + df = add_partition(df, cutoff) + df = add_weight(df, cutoff) + + # TODO: Ablation test + # df['weight'] = 1 + + # Negate all EP values if the sum if negative for the abnormal part. + anomaly_ep_sum = df.loc[df['partition'] == 1, 'ep'].sum() + if anomaly_ep_sum < 0: + df['ep'] = -df['ep'] + anomaly_ep_sum *= -1 + + # Get the adjusted EP threshold + adj_ep_threshold = anomaly_ep_sum * pep_threshold + + # Compute the ep_{>0} values (for element pruning) + df['ep_z'] = np.where(df['ep'] > 0, df['ep'], 0) + + root_causes = [] + pruned_elements = defaultdict(list) if prune_elements else None + while True: + anomaly_ep_sum = df.loc[df['partition'] == 1, 'ep'].sum() + if debug: print('ep sum:', anomaly_ep_sum, 'ths', adj_ep_threshold) + if anomaly_ep_sum < adj_ep_threshold: + break + + root_cause, pruned_elements = search_anomaly(df, attributes, pruned_elements, risk_threshold, adj_ep_threshold, + debug) + if root_cause is None: + break + + if debug: print('Found root cause', root_cause) + + root_causes.append(root_cause) + mask = get_element_mask(df, root_cause['cuboid'], root_cause['elements'][0]) + df = df.loc[~mask] + + return root_causes diff --git a/algorithms/squeeze/__init__.py b/algorithms/squeeze/__init__.py new file mode 100644 index 0000000..a72d2c2 --- /dev/null +++ b/algorithms/squeeze/__init__.py @@ -0,0 +1,2 @@ +from .squeeze import * +from .squeeze_option import * diff --git a/algorithms/squeeze/__pycache__/__init__.cpython-37.pyc b/algorithms/squeeze/__pycache__/__init__.cpython-37.pyc new file mode 100644 index 0000000..dcb1377 Binary files /dev/null and b/algorithms/squeeze/__pycache__/__init__.cpython-37.pyc differ diff --git a/algorithms/squeeze/__pycache__/anomaly_amount_fileter.cpython-37.pyc b/algorithms/squeeze/__pycache__/anomaly_amount_fileter.cpython-37.pyc new file mode 100644 index 0000000..4ed8d6b Binary files /dev/null and b/algorithms/squeeze/__pycache__/anomaly_amount_fileter.cpython-37.pyc differ diff --git a/algorithms/squeeze/__pycache__/attribute_combination.cpython-37.pyc b/algorithms/squeeze/__pycache__/attribute_combination.cpython-37.pyc new file mode 100644 index 0000000..47a62f1 Binary files /dev/null and b/algorithms/squeeze/__pycache__/attribute_combination.cpython-37.pyc differ diff --git a/algorithms/squeeze/__pycache__/squeeze.cpython-37.pyc b/algorithms/squeeze/__pycache__/squeeze.cpython-37.pyc new file mode 100644 index 0000000..9659e9e Binary files /dev/null and b/algorithms/squeeze/__pycache__/squeeze.cpython-37.pyc differ diff --git a/algorithms/squeeze/__pycache__/squeeze_option.cpython-37.pyc b/algorithms/squeeze/__pycache__/squeeze_option.cpython-37.pyc new file mode 100644 index 0000000..bb9214c Binary files /dev/null and b/algorithms/squeeze/__pycache__/squeeze_option.cpython-37.pyc differ diff --git a/algorithms/squeeze/anomaly_amount_fileter.py b/algorithms/squeeze/anomaly_amount_fileter.py new file mode 100644 index 0000000..04198d1 --- /dev/null +++ b/algorithms/squeeze/anomaly_amount_fileter.py @@ -0,0 +1,25 @@ +import numpy as np +from kneed import KneeLocator +from loguru import logger +from scipy.stats import gaussian_kde + + +class KPIFilter: + def __init__(self, real_array, predict_array): + # self.select_metrics = np.log(np.abs(real_array - predict_array) + 1) / 10 + self.select_metrics = np.abs(real_array - predict_array) + # self.select_metrics = np.abs(predict_array - real_array) / np.abs(real_array + predict_array) + kernel = gaussian_kde(self.select_metrics) + _x = sorted(np.linspace(np.min(self.select_metrics), np.max(self.select_metrics), 1000)) + _y = np.cumsum(kernel(_x)) + knee = KneeLocator(_x, _y, curve='concave', direction='increasing').knee + logger.info(f"kneed: {knee}") + if knee is None: + logger.warning("no knee point found") + knee = np.min(self.select_metrics) + self.filtered_indices = np.where(self.select_metrics > knee) + + self.original_indices = np.arange(len(real_array))[self.filtered_indices] + + def inverse_map(self, indices): + return self.original_indices[indices] diff --git a/algorithms/squeeze/attribute_combination.py b/algorithms/squeeze/attribute_combination.py new file mode 100644 index 0000000..71d1633 --- /dev/null +++ b/algorithms/squeeze/attribute_combination.py @@ -0,0 +1,153 @@ +import copy +from functools import reduce +import numpy as np +import pandas as pd +from typing import FrozenSet, Iterable + + +class AttributeCombination(dict): + ANY = '__ANY__' + + def __init__(self, **kwargs): + super().__init__(**{key: str(value) for key, value in kwargs.items()}) + self.__id = None + self.non_any_keys = tuple() + self.non_any_values = tuple() + self.__is_terminal = False + self.__update() + + def __update(self): + self.__id = tuple((key, self[key]) for key in sorted(self.keys())) + self.non_any_keys = tuple(_ for _ in sorted(self.keys()) if self[_] != self.ANY) + self.non_any_values = tuple(self[_] for _ in sorted(self.keys()) if self[_] != self.ANY) + self.__is_terminal = not any(self.ANY == value for value in self.values()) + + def __eq__(self, other: 'AttributeCombination'): + return self.__id == other.__id + + def __lt__(self, other): + return self.__id < other.__id + + def __le__(self, other): + return self.__id <= other.__id + + def __hash__(self): + return hash(self.__id) + + def __setitem__(self, key, value): + super().__setitem__(key, str(value)) + self.__update() + + def __str__(self): + return "&".join(f"{key}={value}" for key, value in zip(self.non_any_keys, self.non_any_values)) + + @staticmethod + def from_string(string: str, attribute_names) -> 'AttributeCombination': + ret = AttributeCombination.get_root_attribute_combination(attribute_names) + for pair in string.split("&"): + if pair == "": + continue + key, value = pair.split("=") + ret[key] = value + return ret + + @staticmethod + def batch_from_string(string: str, attribute_names) -> 'FrozenSet[AttributeCombination]': + return frozenset({AttributeCombination.from_string(_, attribute_names) for _ in string.split(";")}) + + @staticmethod + def batch_to_string(sets: Iterable['AttributeCombination']) -> str: + return ";".join(str(_) for _ in sets) + + def copy_and_update(self, other): + o = copy.copy(self) + o.update(other) + o.__update() + return o + + @staticmethod + def get_attribute_combination(data: pd.DataFrame): + columns = list(set(data.columns) - {'real', 'predict'}) + _attributes = AttributeCombination() + for column in columns: + _attributes[column] = AttributeCombination.ANY + return _attributes + + def index_dataframe_without_index(self, data: pd.DataFrame): + # noinspection PyTypeChecker + return reduce(np.logical_and, + [data[key] == value for key, value in self.items() if value != self.ANY], + np.ones(len(data), dtype=bool)) + + def index_dataframe(self, data: pd.DataFrame): + if len(self.non_any_values) == 0: + return np.ones(len(data), dtype=np.bool) + try: + arr = np.zeros(shape=len(data), dtype=np.bool) + if len(self.non_any_values) == 1: + idx = data.index.get_loc(self.non_any_values[0]) + else: + idx = data.index.get_loc(self.non_any_values) + arr[idx] = True + return arr + except KeyError: + return np.zeros(len(data), dtype=np.bool) + + def is_terminal(self): + return self.__is_terminal + + @staticmethod + def batch_index_dataframe(attribute_combinations, data: pd.DataFrame): + # noinspection PyTypeChecker + index = reduce(np.logical_or, + (_.index_dataframe(data) for _ in attribute_combinations), + np.zeros(len(data), dtype=np.bool)) + return index + + @staticmethod + def batch_index_dataframe_without_index(attribute_combinations, data: pd.DataFrame): + # noinspection PyTypeChecker + index = reduce(np.logical_or, + (_.index_dataframe_without_index(data) for _ in attribute_combinations), + np.zeros(len(data), dtype=np.bool)) + return index + + @staticmethod + def get_root_attribute_combination(attribute_names): + return AttributeCombination(**{key: AttributeCombination.ANY for key in attribute_names}) + + def is_descent(self, other): + return all(self.__attribute_is_descent(sorted(item_a), sorted(item_b)) + for item_a, item_b in zip(self.items(), other.items())) + + @staticmethod + def __attribute_is_descent(a, b): + return a[0] == b[0] and (a[1] == b[1] or b[1] == AttributeCombination.ANY) + + def mask(self, keys): + """ + :param keys: keep which keys + :return: a new attribute combination, keep keys, the others are set ANY + """ + to_fill_keys = set(self.keys()) - set(keys) + return self.copy_and_update({key: self.ANY for key in to_fill_keys}) + + @staticmethod + def from_iops_2019_format(string: str, attribute_names=None) -> FrozenSet['AttributeCombination']: + """ + :param attribute_names: + :param string: + :return: + """ + if attribute_names is None: + attribute_names = ['i', 'e', 'c', 'p', 'l'] + root = AttributeCombination(**{key: AttributeCombination.ANY for key in attribute_names}) + results = {root.copy_and_update({_[0]: _ for _ in case.split('&') if _ != ''}) for case in string.split(';')} + return frozenset(results) + + @staticmethod + def to_iops_2019_format(attribute_combinations: Iterable['AttributeCombination']): + return ";".join("&".join(_.non_any_values) for _ in attribute_combinations) + + +AC = AttributeCombination diff --git a/algorithms/squeeze/clustering/__init__.py b/algorithms/squeeze/clustering/__init__.py new file mode 100644 index 0000000..9d0c9c4 --- /dev/null +++ b/algorithms/squeeze/clustering/__init__.py @@ -0,0 +1,9 @@ +from .cluster import * +from .density_cluster import * + + +def cluster_factory(option: SqueezeOption): + method_map = { + "density": DensityBased1dCluster, + } + return method_map[option.cluster_method](option) diff --git a/algorithms/squeeze/clustering/__pycache__/__init__.cpython-37.pyc b/algorithms/squeeze/clustering/__pycache__/__init__.cpython-37.pyc new file mode 100644 index 0000000..d637e15 Binary files /dev/null and b/algorithms/squeeze/clustering/__pycache__/__init__.cpython-37.pyc differ diff --git a/algorithms/squeeze/clustering/__pycache__/cluster.cpython-37.pyc b/algorithms/squeeze/clustering/__pycache__/cluster.cpython-37.pyc new file mode 100644 index 0000000..732261e Binary files /dev/null and b/algorithms/squeeze/clustering/__pycache__/cluster.cpython-37.pyc differ diff --git a/algorithms/squeeze/clustering/__pycache__/density_cluster.cpython-37.pyc b/algorithms/squeeze/clustering/__pycache__/density_cluster.cpython-37.pyc new file mode 100644 index 0000000..5771289 Binary files /dev/null and b/algorithms/squeeze/clustering/__pycache__/density_cluster.cpython-37.pyc differ diff --git a/algorithms/squeeze/clustering/cluster.py b/algorithms/squeeze/clustering/cluster.py new file mode 100644 index 0000000..a6b430d --- /dev/null +++ b/algorithms/squeeze/clustering/cluster.py @@ -0,0 +1,16 @@ +from ..squeeze_option import SqueezeOption +from typing import List +import numpy as np + + +class Cluster: + """ + one dim cluster, give a 1d-array, return each clusters indices + """ + + def __init__(self, option: SqueezeOption): + self.option = option + + def __call__(self, array) -> List[np.ndarray]: + raise NotImplementedError() + diff --git a/algorithms/squeeze/clustering/density_cluster.py b/algorithms/squeeze/clustering/density_cluster.py new file mode 100644 index 0000000..1c6556f --- /dev/null +++ b/algorithms/squeeze/clustering/density_cluster.py @@ -0,0 +1,141 @@ +import seaborn as sns +import numpy as np +from loguru import logger +from scipy.stats import gaussian_kde +from scipy.signal import argrelextrema +import matplotlib.pyplot as plt +from algorithms.squeeze.clustering.cluster import Cluster +from algorithms.squeeze.squeeze_option import SqueezeOption + + +def smooth(arr, window_size): + new_arr = np.convolve(arr, np.ones(window_size), mode="valid") / window_size + new_arr = np.concatenate([arr[:window_size - 1], new_arr]) + assert np.shape(new_arr) == np.shape(arr) + return new_arr + + +class DensityBased1dCluster(Cluster): + def __init__(self, option: SqueezeOption): + super().__init__(option) + assert option.density_estimation_method in {'kde', 'histogram'} + self.density_estimation_func = { + "kde": self._kde, + "histogram": self._histogram, + }[option.density_estimation_method] + + def _kde(self, array: np.ndarray): + kernel = gaussian_kde(array, bw_method=self.option.kde_bw_method, weights=self.option.kde_weights) + samples = np.arange(np.min(array), np.max(array), 0.01) + kde_sample = kernel(points=samples) + conv_kernel = self.option.density_smooth_conv_kernel + kde_sample_smoothed = np.convolve(kde_sample, conv_kernel, 'full') / np.sum(conv_kernel) + return kde_sample_smoothed, samples + + def _histogram(self, array: np.ndarray): + def _get_hist(_width): + if _width == 'auto': + _edges = np.histogram_bin_edges(array, 'auto').tolist() + _edges = [_edges[0] - 0.1 * i for i in range(-5, 0, -1)] + _edges + [_edges[-1] + 0.1 * i for i in + range(1, 6)] + else: + _edges = np.arange(array_range[0] - _width * 6, array_range[1] + _width * 5, _width) + h, edges = np.histogram(array, bins=_edges, density=True) + h /= 100. + # conv_kernel = self.option.density_smooth_conv_kernel + # h = np.convolve(h, conv_kernel, 'full') / np.sum(conv_kernel) + return h, np.convolve(edges, [1, 1], 'valid') / 2 + + def _get_score(_clusters): + if len(_clusters) <= 0: + return float('-inf') + _mu = np.concatenate([np.repeat(np.mean(array[idx]), np.size(idx)) for idx in _clusters]) + _sigma = np.concatenate([np.repeat(np.std(array[idx]), np.size(idx)) for idx in _clusters]) + 1e-8 + # _arrays = np.concatenate([array[idx] for idx in _clusters]) + # _scores = np.sum(- np.log(_sigma) - np.square((_arrays - _mu) / _sigma)) + _scores = np.max(_sigma) + return _scores + + array_range = np.min(array), np.max(array) + width = self.option.histogram_bar_width + # if width == 'auto': + # x_list = [0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.10] + # hists = [_get_hist(_width) for _width in x_list] + # # y_list = [len(argrelextrema( + # # _get_hist(_width=_width)[0], comparator=np.greater_equal, + # # axis=0, order=self.option.cluster_smooth_window_size, mode='clip')[0]) for _width in x_list] + # clusters_list = [self._cluster(array, density_array, bins) for density_array, bins in hists] + # y_list = [_get_score(clusters) for clusters in clusters_list] + # split = KneeLocator(x_list, y_list, curve='concave', direction='increasing').knee + # if split is None: + # split = x_list[0] + # # elbow = x_list[np.argmax(y_list)] + # logger.debug(f"{x_list}, {y_list}, {split}") + # width = split + + return _get_hist(width) + + def _cluster(self, array, density_array: np.ndarray, bins, plot=False): + def significant_greater(a, b): + return (a - b) / (a + b) > 0.1 + + order = 1 + extreme_max_indices = argrelextrema( + density_array, comparator=lambda x, y: x > y, + axis=0, order=order, mode='wrap')[0] + extreme_min_indices = argrelextrema( + density_array, comparator=lambda x, y: x <= y, + axis=0, order=order, mode='wrap')[0] + extreme_max_indices = list(filter(lambda x: density_array[x] > 0, extreme_max_indices)) + if plot: + for idx in extreme_max_indices: + plt.axvline(bins[idx], linestyle="-", color="red", label="relmax", alpha=0.5, linewidth=0.8) + for idx in extreme_min_indices: + plt.axvline(bins[idx], linestyle="--", color="blue", label="relmin", alpha=0.5, linewidth=0.8) + + cluster_list = [] + boundaries = np.asarray([float('-inf')] + [bins[index] for index in extreme_min_indices] + [float('+inf')]) + if self.option.max_normal_deviation == 'auto': + mu = np.mean(np.abs(array)) + max_normal = mu + logger.debug(f"max normal {max_normal}") + self.option.max_normal_deviation = max_normal + for index in extreme_max_indices: + left_boundary = boundaries[np.searchsorted(boundaries, bins[index], side='right') - 1] + right_boundary = boundaries[np.searchsorted(boundaries, bins[index], side='left')] + cluster_indices = np.where( + np.logical_and( + array <= right_boundary, + array >= left_boundary, + ) + )[0] + cluster = array[cluster_indices] + mu = np.mean(np.abs(cluster)) + logger.debug(f"({left_boundary, right_boundary}, {mu})") + if np.abs(mu) < self.option.max_normal_deviation or len(cluster) <= 0: + continue + cluster_list.append(cluster_indices) + return cluster_list + + def __call__(self, array): + array = array.copy() + density_array, bins = self.density_estimation_func(array) + density_array = np.copy(density_array) + if self.option.cluster_smooth_window_size == "auto": + window_size = max(np.count_nonzero(density_array > 0) // 10, 1) + logger.debug(f"auto window size: {window_size} {np.count_nonzero(density_array > 0)}") + else: + window_size = self.option.cluster_smooth_window_size + smoothed_density_array = smooth(density_array, window_size) + if self.option.debug: + fig, ax1 = plt.subplots(figsize=(3.6, 1.8)) + sns.distplot(array, bins='auto', label="density", hist=True, kde=False, norm_hist=True, ax=ax1) + ax1.set_ylim([0, None]) + + clusters = self._cluster(array, smoothed_density_array, bins, plot=self.option.debug) + if self.option.debug: + for cluster in clusters: + left_boundary, right_boundary = np.min(array[cluster]), np.max(array[cluster]) + logger.debug(f"cluster: [{left_boundary}, {right_boundary}]") + return clusters + diff --git a/algorithms/squeeze/squeeze.py b/algorithms/squeeze/squeeze.py new file mode 100644 index 0000000..3bbc594 --- /dev/null +++ b/algorithms/squeeze/squeeze.py @@ -0,0 +1,364 @@ +import numpy as np +import pandas as pd +from functools import lru_cache +from itertools import combinations +from typing import List, FrozenSet, Union +from loguru import logger +from typing import Tuple +from algorithms.squeeze.attribute_combination import AttributeCombination as AC +from algorithms.squeeze.anomaly_amount_fileter import KPIFilter +from algorithms.squeeze.squeeze_option import SqueezeOption +from algorithms.squeeze.clustering import cluster_factory +from scipy.spatial.distance import cityblock +from copy import deepcopy + + +class Squeeze: + def __init__(self, data_list: List[pd.DataFrame], op=lambda x: x, option: SqueezeOption = SqueezeOption()): + """ + :param data_list: dataframe without index, + must have 'real' and 'predict' columns, other columns are considered as attributes + all elements in this list must have exactly the same attribute combinations in the same order + """ + self.option = option + + self.one_dim_cluster = cluster_factory(self.option) + self.cluster_list = [] # type: List[np.ndarray] + + valid_idx = np.logical_and.reduce( + [_.predict > 0 for _ in data_list], + ) + + self.data_list = list(_[valid_idx] for _ in data_list) + self.op = op + self.derived_data = self.get_derived_dataframe(None) # type: pd.DataFrame + # There is an error in injection + self.derived_data.real -= min(np.min(self.derived_data.real), 0) + + self.attribute_names = list(sorted(set(self.derived_data.columns) - {'real', 'predict'})) + logger.debug(f"available attributes: {self.attribute_names}") + + self.derived_data.sort_values(by=self.attribute_names, inplace=True) + self.data_list = list(map(lambda x: x.sort_values(by=self.attribute_names), self.data_list)) + + self.attribute_values = list(list(set(self.derived_data.loc[:, name].values)) for name in self.attribute_names) + logger.debug(f"available values: {self.attribute_values}") + + self.ac_array = np.asarray( + [AC(**record) for record in self.derived_data[self.attribute_names].to_dict(orient='records')]) + + self._v = self.derived_data['real'].values + self._f = self.derived_data['predict'].values + assert all(self._v >= 0) and all(self._f >= 0), \ + f"currently we assume that KPIs are non-negative, {self.derived_data[~(self._f >= 0)]}" + + self.__finished = False + self._root_cause = [] + + self.filtered_indices = None + + @property + @lru_cache() + def root_cause(self): + return self._root_cause + + @property + @lru_cache() + def root_cause_string_list(self): + unique_root_cause = np.unique(self.root_cause) + root_cause = list(AC.batch_to_string(_) for _ in unique_root_cause) + return root_cause + + @property + @lru_cache() + def report(self) -> str: + cluster_impacts = [ + np.sum(np.abs(self._f[idx] - self._v[idx])) for idx in self.cluster_list + ] + unique_root_cause, rc_indies = np.unique(self.root_cause, return_index=True) + cluster_impacts = [ + np.sum(cluster_impacts[idx]) for idx in rc_indies + ] + logger.debug(f"{unique_root_cause}, {cluster_impacts}") + report_df = pd.DataFrame(columns=['root_cause', 'impact']) + report_df['root_cause'] = list(AC.batch_to_string(_) for _ in unique_root_cause) + report_df['impact'] = cluster_impacts + report_df.sort_values(by=['impact'], inplace=True, ascending=False) + return report_df.to_csv(index=False) + + @lru_cache() + def get_cuboid_ac_array(self, cuboid: Tuple[str, ...]): + return np.asarray(list(map(lambda x: x.mask(cuboid), self.ac_array))) + + @lru_cache() + def get_indexed_data(self, cuboid: Tuple[str, ...]): + return self.derived_data.set_index(list(cuboid)) + + @property + @lru_cache() + def normal_indices(self): + abnormal = np.sort(np.concatenate(self.cluster_list)) + idx = np.argsort(np.abs(self.leaf_deviation_score[abnormal])) + abnormal = abnormal[idx] + normal = np.where(np.abs(self.leaf_deviation_score) < self.leaf_deviation_score[abnormal[0]])[0] + # normal = np.setdiff1d(np.arange(len(self.derived_data)), abnormal, assume_unique=True) + # return np.intersect1d(normal, self.filtered_indices, assume_unique=True) + return normal + + def run(self): + if self.__finished: + logger.warning(f"try to rerun {self}") + return self + if self.option.enable_filter: + kpi_filter = KPIFilter(self._v, self._f) + self.filtered_indices = kpi_filter.filtered_indices + cluster_list = self.one_dim_cluster(self.leaf_deviation_score[self.filtered_indices]) + cluster_list = list( + [kpi_filter.inverse_map(_) for _ in cluster_list] + ) + cluster_list = list( + [list( + filter(lambda x: np.min(self.leaf_deviation_score[_]) <= self.leaf_deviation_score[x] <= np.max( + self.leaf_deviation_score[_]), np.arange(len(self._f))) + ) + for _ in cluster_list] + ) + self.cluster_list = cluster_list + else: + self.filtered_indices = np.ones(len(self._v), dtype=bool) + self.cluster_list = self.one_dim_cluster(self.leaf_deviation_score) + + self.locate_root_cause() + self.__finished = True + self._root_cause = self._root_cause + return self + + def _locate_in_cuboid(self, cuboid, indices, **params) -> Tuple[FrozenSet[AC], float]: + """ + :param cuboid: try to find root cause in this cuboid + :param indices: anomaly leaf nodes' indices + :return: root causes and their score + """ + # mu = params.get("mu") + # sigma = params.get("sigma") + data_cuboid_indexed = self.get_indexed_data(cuboid) + logger.debug(f"current cuboid: {cuboid}") + + abnormal_cuboid_ac_arr = self.get_cuboid_ac_array(cuboid)[indices] + elements, num_elements = np.unique(abnormal_cuboid_ac_arr, return_counts=True) + + num_ele_descents = np.asarray(list( + np.count_nonzero( + _.index_dataframe(data_cuboid_indexed), + ) for _ in elements + )) + + # sort reversely by descent score + descent_score = num_elements / np.maximum(num_ele_descents, 1e-4) + idx = np.argsort(descent_score)[::-1] + elements = elements[idx] + num_ele_descents = num_ele_descents[idx] + num_elements = num_elements[idx] + + # descent_score = descent_score[idx] + del descent_score + + logger.debug(f"elements: {';'.join(str(_) for _ in elements)}") + + def _root_cause_score(partition: int) -> float: + dis_f = cityblock + data_p, data_n = self.get_derived_dataframe( + frozenset(elements[:partition]), cuboid=cuboid, + reduction=lambda x: x, return_complement=True, + subset_indices=np.concatenate([indices, self.normal_indices])) + assert len(data_p) + len(data_n) == len(indices) + len(self.normal_indices), \ + f'{len(data_n)} {len(data_p)} {len(indices)} {len(self.normal_indices)}' + # dp = self.__deviation_score(data_p['real'].values, data_p['predict'].values) + # dn = self.__deviation_score(data_n['real'].values, data_n['predict'].values) if len(data_n) else [] + # log_ll = np.mean(norm.pdf(dp, loc=mu, scale=sigma)) \ + # + np.mean(norm.pdf(dn, loc=0, scale=self.option.normal_deviation_std)) + _abnormal_descent_score = np.sum(num_elements[:partition]) / np.sum(num_ele_descents[:partition]) + _normal_descent_score = 1 - np.sum(num_elements[partition:] / np.sum(num_ele_descents[partition:])) + _ds = _normal_descent_score * _abnormal_descent_score + succinct = partition + len(cuboid) * len(cuboid) + _pv, _pf = np.sum(data_p.real.values), np.sum(data_p.predict.values) + _lp = len(data_p) + _v1, _v2 = data_p.real.values, data_n.real.values + _v = np.concatenate([_v1, _v2]) + _f1, _f2 = data_p.predict.values, data_n.predict.values + _f = np.concatenate([_f1, _f2]) + + # TODO: fixed known issue + reduced_data_p, _ = self.get_derived_dataframe( + frozenset(elements[:partition]), cuboid=cuboid, + reduction="sum", return_complement=True, + subset_indices=np.concatenate([indices, self.normal_indices])) + if len(reduced_data_p): + _a1, _a2 = data_p.predict.values * ( + reduced_data_p.real.item() / reduced_data_p.predict.item() + ), data_n.predict.values + else: + assert len(data_p) == 0 + _a1 = 0 + _a2 = data_n.predict.values + + # Original: + # _a1, _a2 = data_p.predict.values * (_pv / _pf), data_n.predict.values + + _a = np.concatenate([_a1, _a2]) + divide = lambda x, y: x / y if y > 0 else (0 if x == 0 else float('inf')) + _ps = 1 - (divide(dis_f(_v1, _a1), len(_v1)) + divide(dis_f(_v2, _f2), len(_v2))) \ + / (divide(dis_f(_v1, _f1), len(_v1)) + divide(dis_f(_v2, _f2), len(_v2))) + logger.debug( + f"partition:{partition} " + # f"log_ll:{log_ll} " + # f"impact: {impact_score} " + f"succinct: {succinct} " + f"ps: {_ps}" + ) + # return _p * self.option.score_weight / (-succinct) + return _ps + + partitions = np.arange( + min( + len(elements), + self.option.max_num_elements_single_cluster, + len(set(self.get_indexed_data(cuboid).index.values)) - 1 + ) + ) + 1 + if len(partitions) <= 0: + return elements, float('-inf') + rc_scores = np.asarray(list(map(_root_cause_score, partitions))) + idx = np.argsort(rc_scores)[::-1] + partitions = partitions[idx] + rc_scores = rc_scores[idx] + + score = rc_scores[0] + rc = elements[:partitions[0].item()] + logger.debug(f"cuboid {cuboid} gives root cause {AC.batch_to_string(rc)} with score {score}") + return rc.tolist(), score + + def _locate_in_cluster(self, indices: np.ndarray): + """ + :param indices: indices of leaf nodes in this cluster + :return: None + """ + mu = np.mean(self.leaf_deviation_score[indices]) + sigma = np.maximum(np.std(self.leaf_deviation_score[indices]), 1e-4) + logger.debug(f"locate in cluster: {mu}(+-{sigma})") + max_cuboid_layer = len(self.attribute_names) + ret_lists = [] + for cuboid_layer in np.arange(max_cuboid_layer) + 1: + layer_ret_lists = list(map( + lambda x, _i=indices, _mu=mu, _sigma=sigma: self._locate_in_cuboid(x, indices=_i, mu=_mu, sigma=_sigma), + combinations(self.attribute_names, cuboid_layer) + )) + ret_lists.extend([ + { + 'rc': x[0], 'score': x[1], 'n_ele': len(x[0]), 'layer': cuboid_layer, + 'rank': x[1] * self.option.score_weight - len(x[0]) * cuboid_layer + } for x in layer_ret_lists + ]) + if len(list(filter(lambda x: x['score'] > self.option.ps_upper_bound, ret_lists))): + break + ret_lists = list(sorted( + ret_lists, + key=lambda x: x['rank'], + reverse=True) + ) + if ret_lists: + ret = ret_lists[0]['rc'] + logger.debug( + f"find root cause: {AC.batch_to_string(ret)}, rank: {ret_lists[0]['rank']}, score: {ret_lists[0]['score']}") + self._root_cause.append(frozenset(ret)) + else: + logger.info("failed to find root cause") + + def locate_root_cause(self): + if not self.cluster_list: + logger.info("We do not have abnormal points") + return + if self.option.score_weight == 'auto': + # TODO: Using the revised score formula + # see: https://github.com/NetManAIOps/Squeeze/issues/6 + g_cluster = np.log(len(self.cluster_list) + 1) / len(self.cluster_list) + num_attr = sum([len(attrs) for attrs in self.attribute_values]) + g_attribute = num_attr / np.log(num_attr + 1) + g_coverage = -np.log(sum([len(cluster) for cluster in self.cluster_list]) / len(self.data_list[0])) + self.option.score_weight = g_cluster * g_attribute * g_coverage + + # Original: + #self.option.score_weight = - np.log( + # len(self.cluster_list) * + # sum(len(_) for _ in self.cluster_list) / len(self._f)) / np.log( + # sum(len(_) for _ in self.attribute_values)) * sum(len(_) for _ in self.attribute_values) + #self.option.score_weight = len(self.cluster_list) * \ + # (np.log(sum(len(_) for _ in self.cluster_list)) + np.sum( + # [np.log(len(_)) for _ in self.attribute_values]) - np.log( + # len(self.cluster_list)) - np.log(len(self.leaf_deviation_score))) \ + # / np.log(np.mean([len(_) for _ in self.attribute_values])) * 10 + logger.debug(f"auto score weight: {self.option.score_weight}") + for indices in self.cluster_list: + self._locate_in_cluster(indices) + + @property + @lru_cache() + def leaf_deviation_score(self): + with np.errstate(divide='ignore', invalid='ignore'): + deviation_scores = self.__deviation_score(self._v, self._f) + assert np.shape(deviation_scores) == np.shape(self._v) == np.shape(self._f) + assert np.sum(np.isnan(deviation_scores)) == 0, \ + f"there are nan in deviation score {np.where(np.isnan(deviation_scores))}" + assert np.sum(~np.isfinite(deviation_scores)) == 0, \ + f"there are infinity in deviation score {np.where(~np.isfinite(deviation_scores))}" + logger.debug(f"anomaly ratio ranges in [{np.min(deviation_scores)}, {np.max(deviation_scores)}]") + return deviation_scores + + def get_derived_dataframe(self, ac_set: Union[FrozenSet[AC], None], cuboid: Tuple[str] = None, + reduction=None, return_complement=False, subset_indices=None): + subset = np.zeros(len(self.data_list[0]), dtype=np.bool) + if subset_indices is not None: + subset[subset_indices] = True + else: + subset[:] = True + + if reduction == "sum": + reduce = lambda x, _axis=0: np.sum(x, axis=_axis, keepdims=True) + else: + reduce = lambda x: x + + if ac_set is None: + idx = np.ones(shape=(len(self.data_list[0]),), dtype=np.bool) + else: + idx = AC.batch_index_dataframe(ac_set, self.get_indexed_data(cuboid)) + + def _get_ret(_data_list): + if len(_data_list[0]) == 0: + return pd.DataFrame(data=[], columns=['real', 'predict']) + _values = self.op(*[reduce(_data[["real", "predict"]].values) for _data in _data_list]) + if np.size(_values) == 0: + _values = [] + if reduction == 'sum': + _ret = pd.DataFrame(data=_values, columns=['real', 'predict']) + else: + _ret = _data_list[0].copy(deep=True) + _ret[['real', 'predict']] = _values + return _ret + + data_list = list(_[idx & subset] for _ in self.data_list) + if not return_complement: + return _get_ret(data_list) + complement_data_list = list(_[(~idx) & subset] for _ in self.data_list) + return _get_ret(data_list), _get_ret(complement_data_list) + + @staticmethod + def __deviation_score(v, f): + n = 1 + with np.errstate(divide='ignore'): + ret = n * (f - v) / (n * f + v) + # ret = np.log(np.maximum(v, 1e-10)) - np.log(np.maximum(f, 1e-10)) + # ret = (2 * sigmoid(1 - v / f) - 1) + # k = np.log(np.maximum(v, 1e-100)) - np.log(np.maximum(f, 1e-100)) + # ret = (1 - k) / (1 + k) + ret[np.isnan(ret)] = 0. + return ret diff --git a/algorithms/squeeze/squeeze_option.py b/algorithms/squeeze/squeeze_option.py new file mode 100644 index 0000000..8084955 --- /dev/null +++ b/algorithms/squeeze/squeeze_option.py @@ -0,0 +1,36 @@ +class SqueezeOption: + def __init__(self, **kwargs): + self.debug = False + + # Filter + self.enable_filter = True + + # Density Estimation + self.cluster_method = "density" + self.density_estimation_method = 'histogram' + + # KDE + self.density_smooth_conv_kernel = [1.] + self.kde_bw_method = None + self.kde_weights = None + + # Histogram + self.histogram_bar_width = "auto" + + # relative max + self.max_allowed_deviation_bias = 0.10 + self.max_allowed_deviation_std = 0.01 + + # Cluster + self.cluster_smooth_window_size = "auto" + self.max_normal_deviation = 0.20 + + # Group + # self.least_score = 2.0 + self.least_descent_score = 0.6 + self.normal_deviation_std = 0.1 + self.score_weight = "auto" + self.max_num_elements_single_cluster = 12 + self.ps_upper_bound = 0.90 + + self.__dict__.update(kwargs) diff --git a/data/README.txt b/data/README.txt new file mode 100644 index 0000000..c1cce5a --- /dev/null +++ b/data/README.txt @@ -0,0 +1 @@ +Add any datasets to this directory diff --git a/generate_dataset.py b/generate_dataset.py new file mode 100644 index 0000000..fb5c6f1 --- /dev/null +++ b/generate_dataset.py @@ -0,0 +1,313 @@ +import os +import argparse +import itertools +import numpy as np +import pandas as pd +from functools import reduce +from utils.utils import str2bool, limited_number_type + +parser = argparse.ArgumentParser(description='Generate synthetic dataset.') +parser.add_argument('--num', required=True, type=int, help='number of instances to generate') +parser.add_argument('--dims', type=int, nargs='+', default=[10, 12, 10, 8, 5], help='dimension sizes') +parser.add_argument('--seed', type=int, default=123, help='initial random seed') +parser.add_argument('--data-root', type=str, default='./data', help='output directory') +parser.add_argument('--dataset-name', type=str, help='name of the new dataset') +parser.add_argument('--weibull-alpha', type=limited_number_type(float, minimum=0), nargs=2, default=[0.5, 1.0], + metavar=('min', 'max'), + help='range of Weibull alpha to control the real value distribution of normal leaf elements') +parser.add_argument('--zero-rate', type=limited_number_type(float, 0, 1), nargs=2, default=[0.0, 0.25], + metavar=('min', 'max'), help='range of percentage of leaf elements to set to zero') +parser.add_argument('--noise-level', type=limited_number_type(float, 0, 1), nargs=2, default=[0.0, 0.25], + metavar=('min', 'max'), help='range of relative forecasting error of the normal leaf elements') +parser.add_argument('--anomaly-severity', type=limited_number_type(float, 0, 1), nargs=2, default=[0.2, 1.0], + metavar=('min', 'max'), help='range of severity of the anomalies') +parser.add_argument('--anomaly-deviation', type=limited_number_type(float, 0, 1), nargs=2, default=[0.0, 0.1], + metavar=('min', 'max'), help='range of deviation of the anomalies') +parser.add_argument('--num-anomaly', type=limited_number_type(int, minimum=1), nargs=2, default=[1, 3], + metavar=('min', 'max'), help='range of number of anomalies') +parser.add_argument('--num-anomaly-elements', type=limited_number_type(int, minimum=1), nargs=2, default=[1, 3], + metavar=('min', 'max'), help='range of number of elements within each anomaly') +parser.add_argument('--only-last-layer', type=str2bool, nargs='?', const=True, default=False, + help='place all anomalies in the highest layer') +args = parser.parse_args() + +# S: 121, L: 122, H: 123 +rng = np.random.default_rng(args.seed) + +# Basic settings +# S: dimensions = {'a': 10, 'b': 12, 'c': 10, 'd': 8, 'e': 5} +# L: dimensions = {'a': 10, 'b': 24, 'c': 10, 'd': 15} +# 2: dimensions = {'a': 10, 'b': 5, 'c': 250, 'd': 20, 'e': 8, 'f': 12} +dimensions = dict(zip('abcdefghijklmnopqrstuvwxyz', args.dims)) + +# S and L uses 1000, H dataset uses 100 +number_of_files = args.num + +# The normal ('real') value distribution +weibull_alpha = [min(args.weibull_alpha), max(args.weibull_alpha)] + +# Noise parameters +zero_rate = [min(args.zero_rate), max(args.zero_rate)] +noise_level = [min(args.noise_level), max(args.noise_level)] # S,H: [0.0,0.25], L: [0.0,0.1] + +# Anomaly parameters +anomaly_severity = [min(args.anomaly_severity), max(args.anomaly_severity)] # S,H: [0.2,1.0], L: [0.5,1.0] +anomaly_deviation = [min(args.anomaly_deviation), max(args.anomaly_deviation)] # S,H: [0.0,0.1], L: [0.0,0.0] +num_anomaly = [min(args.num_anomaly), max(args.num_anomaly)] # S,H: [1,3], L: [1,5] +num_anomaly_elements = [min(args.num_anomaly_elements), max(args.num_anomaly_elements)] # S,H: [1,3], L: [1,1] +only_last_layer = args.only_last_layer # S,H: False, L: True + +# Set the output path +data_root = args.data_root +if args.dataset_name is None: + folder = '-'.join(sorted([k + str(v) for k, v in dimensions.items()])) +else: + folder = args.dataset_name +save_path = os.path.join(data_root, folder) +os.makedirs(save_path, exist_ok=True) + + +def get_dataset_properties(dimensions): + sel_zero_rate = rng.uniform(zero_rate[0], zero_rate[1]) + sel_noise_level = rng.uniform(noise_level[0], noise_level[1]) + + print('zero_rate', sel_zero_rate) + print('noise_level', sel_noise_level) + + sel_num_anomalies = rng.integers(num_anomaly[0], num_anomaly[1], endpoint=True) + print('num_anomalies', sel_num_anomalies) + + anomaly_properties = dict() + for i in range(sel_num_anomalies): + if not only_last_layer: + anomaly_level = rng.integers(1, len(dimensions), endpoint=True) + else: + anomaly_level = len(dimensions) + + elements = rng.integers(num_anomaly_elements[0], num_anomaly_elements[1], endpoint=True) + + # Add noise_level to the severity to avoid anomalies too alike to normal data. + severity = rng.uniform(anomaly_severity[0], anomaly_severity[1]) + sel_noise_level + deviation = rng.uniform(anomaly_deviation[0], anomaly_deviation[1]) + anomaly_properties[i] = {'level': anomaly_level, 'elements': elements, + 'severity': severity, 'deviation': deviation} + + print('anomaly_properties', anomaly_properties) + return sel_zero_rate, sel_noise_level, anomaly_properties + + +def get_anomaly_locations(df, dimensions, anomaly_properties): + def _get_anomaly_locations(anomaly_level, anomaly_elements, current_anomalies, depth=0): + if len(dimensions) < anomaly_level: + raise ValueError("anaomly_level should be less or equal to number of dimensions.") + + if np.prod(sorted(list(dimensions.values()), reverse=True)[:anomaly_level]) < anomaly_elements: + raise ValueError("Impossible to get that many anomaly elements with specified input.") + + anomaly_dims = sorted(rng.choice(list(dimensions.keys()), size=anomaly_level, replace=False)) + lowest_layer = len(anomaly_dims) == len(dimensions) + + # Do not reuse the same anomaly dimension again + used_dims = [ca['dimensions'] for ca in current_anomalies] + if anomaly_dims in used_dims and not lowest_layer: + print('Anomaly dimension used, recursive call.') + return _get_anomaly_locations(anomaly_level, anomaly_elements, current_anomalies) + + all_selected_dim_elements = [] + for ad in anomaly_dims: + dim_elements = list(range(1, dimensions[ad] + 1)) + + # We do not want any overlap on current anomalies + # For example, if one anomaly is c=c5 then we do not want an overlapping one at e.g. c=c5&b=b3. + for ca in current_anomalies: + if ad in ca['dimensions']: + # Overlap + idx = ca['dimensions'].index(ad) + used_elements = set([int(a[idx][len(ad):]) for a in ca['cuboids']]) + dim_elements = list(set(dim_elements) - used_elements) + + if len(dim_elements) == 0: + print('All elements in dimension used, recursive call.') + return _get_anomaly_locations(anomaly_level, anomaly_elements, current_anomalies, depth=depth+1) + + selected_dim_elements = rng.choice(dim_elements, size=anomaly_elements, replace=True) + selected_dim_elements = [ad + str(e) for e in selected_dim_elements] + all_selected_dim_elements.append(selected_dim_elements) + + anomaly_cuboids = list(zip(*all_selected_dim_elements)) + + # When in the lowest layer, we do not want any anomalies with real == 0 and predict == 0, + # since we can't predict these (i.e., they would automatically be a false negative when evaluating). + if lowest_layer: + for cuboid in anomaly_cuboids: + element = list(zip(anomaly_dims, cuboid)) + v = df.loc[reduce(lambda c1, c2: c1 & c2, [df[k] == v for k, v in element]), 'real'].iloc[0] + if v == 0: + print('Tried anomaly value is 0, recursive call.') + return _get_anomaly_locations(anomaly_level, anomaly_elements, current_anomalies) + + # Make sure the anomalies are unique + if len(np.unique(anomaly_cuboids, axis=0)) < anomaly_elements: + print('Non-unique elements found, recursive call.') + return _get_anomaly_locations(anomaly_level, anomaly_elements, current_anomalies) + + return anomaly_dims, anomaly_cuboids + + anomalies = [] + for properties in anomaly_properties.values(): + level = properties['level'] + elements = properties['elements'] + anomaly_dims, anomaly_cuboids = _get_anomaly_locations(level, elements, anomalies) + anomalies.append({'dimensions': anomaly_dims, 'cuboids': anomaly_cuboids}) + return anomalies + + +def get_anomaly_masks(df, anomalies): + anomaly_masks = [] + for anomaly in anomalies: + dims = anomaly['dimensions'] + + cuboid_mask = [] + for cuboid in anomaly['cuboids'] : + tup = zip(dims, cuboid) if len(dims) > 1 else [(dims[0], cuboid[0])] + masks = [df[d] == c for d, c in tup] + mask = np.logical_and.reduce(masks) + cuboid_mask.append(mask) + cuboid_mask = np.logical_or.reduce(cuboid_mask) + + anomaly_masks.append(cuboid_mask) + return anomaly_masks + + +def scale_anomaly(row, properties): + severity = properties['severity'] + deviation = properties['deviation'] + + r = rng.normal(loc=severity, scale=deviation) + v = max(row * (1 - r), 0.0) + return v + + +def generate_labels(anomalies): + labels = [] + for anomaly in anomalies: + dims = anomaly['dimensions'] + for element in anomaly['cuboids']: + ts = zip(dims, element) if len(dims) > 1 else [(dims[0], element[0])] + label = "&".join(sorted(["=".join(t) for t in ts])) + labels.append(label) + labels = ';'.join(labels) + return labels + + +def create_metadata(df, anomaly_masks, anomaly_properties, zero_rate, noise_level, direction): + mask = np.logical_or.reduce(anomaly_masks) + normal_predict_amount = df.loc[~mask, 'predict'].abs().sum() + normal_predict_error = (df.loc[~mask, 'real'] - df.loc[~mask, 'predict']).abs().sum() + + abnormal_predict_amount = df.loc[mask, 'predict'].abs().sum() + abnormal_predict_error = (df.loc[mask, 'real'] - df.loc[mask, 'predict']).abs().sum() + anomaly_significance = abnormal_predict_error / df['predict'].sum() + + anomaly_severities = ';'.join([str(round(anomaly_properties[i]['severity'], 2)) for i in range(len(anomaly_masks))]) + anomaly_deviations = ';'.join([str(round(anomaly_properties[i]['deviation'], 2)) for i in range(len(anomaly_masks))]) + anomaly_elements = ';'.join([str(round(anomaly_properties[i]['elements'], 2)) for i in range(len(anomaly_masks))]) + metadata = {'total_real_amount': df['real'].sum().round(2), + 'total_predict_amount': df['predict'].sum().round(2), + 'normal_predict_amount': round(normal_predict_amount, 2), + 'normal_predict_error': round(normal_predict_error, 2), + 'abnormal_predict_amount': round(abnormal_predict_amount, 2), + 'abnormal_predict_error': round(abnormal_predict_error, 2), + 'anomaly_significance': round(anomaly_significance, 2), + 'zero_rate': round(zero_rate, 2), + 'noise_level': round(noise_level, 2), + 'elements_per_anomaly': anomaly_elements, + 'anomaly_severity': anomaly_severities, + 'anomaly_deviation': anomaly_deviations, + 'anomaly_direction': direction + } + return metadata + + +def generate_dataset(dimensions): + sel_zero_rate, sel_noise_level, anomaly_properties = get_dataset_properties(dimensions) + dimension_values = [[dimension + str(i) for i in range(1, num + 1)] for dimension, num in dimensions.items()] + + # Add all combinations + df = pd.DataFrame(list(itertools.product(*dimension_values)), columns=dimensions.keys()) + + # Add real values + alpha = rng.uniform(weibull_alpha[0], weibull_alpha[1]) + df['real'] = rng.weibull(alpha, len(df)) * 100 + print('Added real') + + # Add zero rows + df['real'] = df['real'] * (rng.uniform(size=len(df)) > sel_zero_rate) + print('Added zero rows') + + # Apply noise to the predictions + df['predict'] = df['real'] + df['real'] * rng.normal(loc=0, size=len(df), scale=sel_noise_level) + print('Added noise') + + # Swap for equal distribution on both sides + p = df['predict'].copy() + r = rng.integers(0, 1, size=len(df), endpoint=True) + df['predict'] = np.where(r == 1, df['real'], df['predict']) + df['real'] = np.where(r == 0, df['real'], p) + df.loc[df['predict'] < 0, 'predict'] = 0.0 + del p + del r + print('Swap predict/real') + + # Create and add anomalies + anomalies = get_anomaly_locations(df, dimensions, anomaly_properties) + print('anomalies', anomalies) + + anomaly_masks = get_anomaly_masks(df, anomalies) + + # We set the direction following the prediction error direction in the normal data. + # This is to make sure that the error direction in the normal data does not overweigh + # the error introduced by the anomalies. + direction = 1 if df['real'].sum() > df['predict'].sum() else 0 + + for i, anomaly_mask in enumerate(anomaly_masks): + properties = anomaly_properties[i] + if direction == 0: + df.loc[anomaly_mask, 'real'] = df.loc[anomaly_mask, 'predict'] # reset the noise + df.loc[anomaly_mask, 'real'] = df.loc[anomaly_mask, 'real'].apply(scale_anomaly, properties=properties) + else: + df.loc[anomaly_mask, 'predict'] = df.loc[anomaly_mask, 'real'] # reset the noise + df.loc[anomaly_mask, 'predict'] = df.loc[anomaly_mask, 'predict'].apply(scale_anomaly, properties=properties) + + labels = generate_labels(anomalies) + metadata = create_metadata(df, anomaly_masks, anomaly_properties, sel_zero_rate, sel_noise_level, direction) + return df, labels, metadata + + +def generate_filename(n, used_names): + filename = 0 + while filename == 0 or filename in used_names: + range_start = 10**(n-1) + range_end = (10**n)-1 + filename = str(rng.integers(range_start, range_end)) + return filename + + +if __name__ == "__main__": + used_names = set() + dataset_info = dict() + for i in range(number_of_files): + filename = generate_filename(6, used_names) + used_names.add(filename) + + print('creating file', filename) + df, labels, metadata = generate_dataset(dimensions) + df.to_csv(os.path.join(save_path, filename + '.csv'), index=False) + dataset_info[filename] = {'set': labels, **metadata} + del df + print('-----------------------') + + # Save inside the loop for intermediate results + df_info = pd.DataFrame.from_dict(dataset_info, orient='index') + df_info = df_info.rename_axis('timestamp').reset_index() + df_info.to_csv(os.path.join(save_path, 'injection_info.csv'), index=False) diff --git a/run.py b/run.py new file mode 100644 index 0000000..f8f9c08 --- /dev/null +++ b/run.py @@ -0,0 +1,109 @@ +import os +import time +import pandas as pd +from multiprocessing import Pool +from utils.argument_parser import get_input_arguments +from utils.run_utils import get_instances, read_dataframe, run_method, result_post_processing +from utils.evaluation import root_cause_postprocessing, score_root_causes + + +def run_directory(data_root, run_path, algorithm, algorithm_args, derived, n_threads, csv_suffix, debug): + """ + Run all files in all subdirectories of run_path. + """ + parallel_run_results = [] + + def parallel_callback(result): + parallel_run_results.append(result) + + instances = get_instances(data_root, run_path) + + pool = Pool(n_threads) + for dataset, sub_directory, file in instances: + + if derived is None: + derived = os.path.basename(dataset) == 'D' + + pool.apply_async(run_instance, + args=(data_root, dataset, sub_directory, file, algorithm, algorithm_args, derived, debug), + callback=parallel_callback) + pool.close() + pool.join() + + result_post_processing(parallel_run_results, algorithm, csv_suffix) + + +def run_single_file(data_root, run_path, algorithm, algorithm_args, derived): + """ + Run a single file. + """ + directory_structure = list(filter(None, run_path.split(os.sep))) + dataset_name = directory_structure[0] if len(directory_structure) > 1 else '' + sub_directory = os.path.join(*directory_structure[1:-1]) if len(directory_structure) > 2 else '' + file = directory_structure[-1].split('.')[0] + + if derived is None: + derived = dataset_name == 'D' + + run_instance(data_root, dataset_name, sub_directory, file, algorithm, algorithm_args, derived, debug=True) + + +def run_instance(data_root, dataset_name, sub_directory, file, algorithm, algorithm_args, derived=False, debug=False): + """ + Runs a single instance (file) and evaluates the result. + :param data_root: str, the root directory for all datasets. + :param dataset_name: str, the name of the dataset to run (must be located within data_root). + :param sub_directory: str, subdirectory of the dataset (can be an empty string or of a depth >= 1). + :param file: str, the file to run. Should not have any file extension (assumed to be csv). + :param algorithm: str, the name of the algorithm that should be run. + :param algorithm_args: dict, any algorithm specific arguments. + :param derived: boolean, if the dataset is derived. + In this case, two files `file`.a.csv and `file`.b.csv. must exist. + :param debug: boolean, if debug mode should be used. + :return: (str, str, str, float, float, float, float, float), the dataset name, subdirectory and file name + are all returned for collecting the results when using multiple threads. Moreover, the F1-score, + true positive count, false positive count, false negative count and the run time are also returned. + """ + run_directory = os.path.join(data_root, dataset_name, sub_directory) + print('Running file:', os.path.join(run_directory, file), ', derived:', derived) + + df, attributes, df_a, df_b = read_dataframe(run_directory, file, derived) + start_time = time.time() + root_causes = run_method(df, [df_a, df_b], attributes, algorithm, algorithm_args, derived, debug) + root_cause_predictions = root_cause_postprocessing(root_causes, algorithm) + run_time = time.time() - start_time + + # Get the label. + labels = pd.read_csv(os.path.join(run_directory, 'injection_info.csv')) + label = labels.loc[labels['timestamp'] == int(file), 'set'].iloc[0] + + # Evaluate the root cause. + TP, FP, FN, true_labels = score_root_causes(root_cause_predictions, label) + F1 = 2 * TP / (2 * TP + FP + FN) + + print('dataset:', dataset_name, 'sub_directory:', sub_directory, 'file:', file, 'label:', label) + if debug or FP > 0 or FN > 0: + print('Run time:', run_time) + print('TP:', TP, 'FP:', FP, 'FN:', FN) + print('True labels: ', true_labels) + print('Predicted labels:', root_cause_predictions) + + return dataset_name, sub_directory, file, F1, TP, FP, FN, run_time + + +if __name__ == "__main__": + + # Get the parsed input arguments. + args, data_root, run_path, algorithm_args, is_single_file = get_input_arguments() + + print('Running', args.algorithm, 'with arguments:', algorithm_args) + if is_single_file: + run_single_file(data_root, run_path, args.algorithm, algorithm_args, args.derived) + else: + # Add algorithm specific arguments to the given csv suffix. + argument_list = [k + '-' + str(v).replace('.', '') for k, v in algorithm_args.items()] + csv_suffix = '-'.join(['', args.output_suffix, *argument_list]) + csv_suffix = csv_suffix if args.output_suffix != '' else csv_suffix[1:] + + run_directory(data_root, run_path, args.algorithm, algorithm_args, args.derived, args.n_threads, csv_suffix, + args.debug) diff --git a/utils/argument_parser.py b/utils/argument_parser.py new file mode 100644 index 0000000..cbc0e18 --- /dev/null +++ b/utils/argument_parser.py @@ -0,0 +1,142 @@ +import os +import argparse + + +def get_input_arguments(): + """ + Construct argument parser. + :return: argparse.ArgumentParser + """ + # Parent parser + parser = argparse.ArgumentParser(description='RiskLoc') + subparsers = parser.add_subparsers(help='algorithm specific help', dest='algorithm', required=True) + common_arguments = ['algorithm', 'data_root', 'run_path', 'derived', 'n_threads', 'output_suffix', 'debug'] + + # Riskloc algorithm specific parameters + subparser_riskloc = subparsers.add_parser('riskloc', help='riskloc help') + subparser_riskloc = add_common_arguments(subparser_riskloc) + subparser_riskloc.add_argument('--risk-threshold', type=float, default=0.5, help='risk threshold') + subparser_riskloc.add_argument('--pep-threshold', type=float, default=0.02, + help='proportional explanatory power threshold') + subparser_riskloc.add_argument('--prune-elements', type=str2bool, nargs='?', const=True, default=True, + help='use element pruning (True/False)') + + # AutoRoot algorithm specific parameters + subparser_autoroot = subparsers.add_parser('autoroot', help='autoroot help') + subparser_autoroot = add_common_arguments(subparser_autoroot) + subparser_autoroot.add_argument('--delta-threshold', type=float, default=0.25, help='delta threshold') + + # Squeeze algorithm specific parameters + subparser_squeeze = subparsers.add_parser('squeeze', help='squeeze help') + subparser_squeeze = add_common_arguments(subparser_squeeze) + subparser_squeeze.add_argument('--ps-upper-bound', type=float, default=0.9, help='threshold') + subparser_squeeze.add_argument('--max-num-elements-single-cluster', type=int, default=12, + help='maximum number of elements returned for a cluster') + + # HotSpot algorithm specific parameters + subparser_hotspot = subparsers.add_parser('hotspot', help='autoroot help') + subparser_hotspot = add_common_arguments(subparser_hotspot) + subparser_hotspot.add_argument('--pt', type=float, default=0.8, help='PT threshold') + subparser_hotspot.add_argument('--m', type=float, default=200, help='maximum number of MCTS iterations') + subparser_hotspot.add_argument('--scoring', type=str, default='gps', choices=['gps', 'ps'], + help='scoring method to use') + + # R_adtributor algorithm specific parameters + subparser_r_adtributor = subparsers.add_parser('r_adtributor', help='r_adtributor help') + subparser_r_adtributor = add_common_arguments(subparser_r_adtributor) + subparser_r_adtributor.add_argument('--teep', type=float, default=0.2, + help='per-element explanatory power threshold') + subparser_r_adtributor.add_argument('--k', type=int, default=3, help='number of returned root cause elements') + + # Adtributor algorithm specific parameters + subparser_adtributor = subparsers.add_parser('adtributor', help='adtributor help') + subparser_adtributor = add_common_arguments(subparser_adtributor) + subparser_adtributor.add_argument('--tep', type=float, default=0.1, help='total explanatory power threshold') + subparser_adtributor.add_argument('--teep', type=float, default=0.1, + help='per-element explanatory power threshold') + subparser_adtributor.add_argument('--k', type=int, default=3, help='number of returned root cause elements') + + args = parser.parse_args() + data_root, run_path, algorithm_args, is_single_file = process_arguments(args, common_arguments) + return args, data_root, run_path, algorithm_args, is_single_file + + +def add_common_arguments(parser): + """ + Adds shared arguments to a parser. + :param parser: argparse.ArgumentParser, the parser to use. + :return: parser with added arguments. + """ + parser.add_argument('--data-root', type=str, default='./data', + help='root directory for all datasets (default ./data)') + parser.add_argument('--run-path', type=str, default='./data', + help='''directory or file to be run; + if a directory, any subdirectories will be considered as well; + must contain data-path as a prefix + ''') + parser.add_argument('--derived', type=str2bool, nargs='?', const=True, default=None, + help='derived dataset (defaults to True for the D dataset and False for others)') + parser.add_argument('--n-threads', type=int, default=10, help='number of threads to run') + parser.add_argument('--output-suffix', type=str, default='', help='suffix for output csv file') + parser.add_argument('--debug', type=str2bool, nargs='?', const=True, default=False, + help='debug mode') + return parser + + +def process_arguments(args, common_arguments): + """ + Preprocessing for the run arguments. Normalizes paths and checks their validity. + Obtains all algorithm specific arguments and checks whether a single file or a directory should be run. + :param args: namespace, the run arguments. + :param common_arguments: list of strings, all argument names that are shared between the algorithms. + :return: (str, str, dict, boolean), normalized data root directory, normalized run path with root removed, + algorithm specific arguments, if running a single file. + """ + data_root = os.path.normpath(args.data_root).lstrip('/') + run_path = os.path.normpath(args.run_path).lstrip('/') + if not run_path.startswith(data_root): + raise argparse.ArgumentTypeError(f'data-root ({args.data_root}) must be prefix of run-path ({args.run_path}).') + + run_path = run_path[len(data_root) + 1:] + + # Get all algorithm specific arguments + algorithm_args = {k: v for k, v in vars(args).items() if k not in common_arguments} + + is_single_file = run_path.endswith('csv') + return data_root, run_path, algorithm_args, is_single_file + + +def limited_number_type(return_type=float, minimum=None, maximum=None): + """ + Helper function to process input arguments with a minimum and maximum value. + :param return_type: the type of the return values, typically float or int. + :param minimum: float or None, the minimum value allowed. + :param maximum: float or None, the maximum value allowed. + :return: a function that will convert the input to `return_type` and check it's bounds. + """ + def range_checker(arg): + try: + n = return_type(arg) + except ValueError: + raise argparse.ArgumentTypeError(''.join(['Must be a ', str(return_type), ' number.'])) + if minimum is not None and n < minimum: + raise argparse.ArgumentTypeError(''.join(['Minimum value must be larger than ', str(minimum), '.'])) + if maximum is not None and n > maximum: + raise argparse.ArgumentTypeError(''.join(['Maximum value must be smaller than ', str(maximum), '.'])) + return n + return range_checker + + +def str2bool(argument): + """ + Helper function to process input arguments of boolean type. + :param argument: str + :return: boolean + """ + if argument.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif argument.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise argparse.ArgumentTypeError('Unsupported value encountered.') + diff --git a/utils/dataset_statistics.py b/utils/dataset_statistics.py new file mode 100644 index 0000000..9203708 --- /dev/null +++ b/utils/dataset_statistics.py @@ -0,0 +1,105 @@ +import re +import os +import numpy as np +import pandas as pd + + +def set_label(df, label): + labels = label.split(';') + df['label'] = 'normal' + for label in labels: + vals = [l.split('=')[1] for l in label.split('&')] + cond = np.all([df[val[0]] == val for val in vals], axis=0) + df.loc[cond, 'label'] = ' & '.join(vals) + return df + + +dataset_path = "./data/" +folders = ['S', 'L', 'H'] + +deep_a = False + +normal_predict_amount = 0 +normal_predict_error = 0 +sigs = [] +num_files = 0 +for folder in folders: + info = os.path.join(dataset_path, folder, 'injection_info.csv') + df = pd.read_csv(info) + + print('folder', folder) + + if 'normal_predict_amount' in df.columns: + normal_predict_amount += df['normal_predict_amount'].sum() + normal_predict_error += df['normal_predict_error'].sum() + + print('residual', df['normal_predict_error'].sum() / df['normal_predict_amount'].sum() * 100) + res = df['normal_predict_error'] / df['normal_predict_amount'] * 100 + print('max residual', res.max()) + + if 'significance' in df.columns: # B datasets + significance = df['significance'].mean() + sigs.extend(df['significance'].values) + elif 'anomaly_significance' in df.columns: # synthetic + significance = df['anomaly_significance'].mean() + sigs.extend(df['anomaly_significance'].values) + else: # A and D + files = os.listdir(os.path.join(dataset, folder)) + + if folder.startswith('new_dataset_A'): + layer = int(re.search(r"layers?_(\d)", folder).group(1)) + elements = int(re.search(r"elements?_(\d)", folder).group(1)) + + print('layer', layer, 'elements', elements) + + if deep_a: + if layer <= 3 and elements <= 3: + print('lower') + continue + else: + if layer > 3 or elements > 3: + print('upper') + continue + + num_files += len(files) - 1 + + folder_sigs = [] + normal_predict_amount_file = 0 + normal_predict_error_file = 0 + for file in files: + if file == 'injection_info.csv' or file == 'truth_prediction.csv': + continue + + file_path = os.path.join(dataset_path, folder, file) + df_file = pd.read_csv(file_path) + + n = 4 if dataset_path[-1] == 'A' else 6 + label = df.loc[df['timestamp'] == int(file[:-n]), 'set'].iloc[0] + + df_file = set_label(df_file, label) + + mask = df_file['label'] != 'normal' + + normal_predict_amount_file += df_file.loc[~mask, 'predict'].sum() + normal_predict_error_file += (df_file.loc[~mask, 'real'] - df_file.loc[~mask, 'predict']).abs().sum() + + abnormal_predict_error = (df_file.loc[mask, 'real'] - df_file.loc[mask, 'predict']).abs().sum() + sig = abnormal_predict_error / df_file['predict'].sum() + folder_sigs.append(sig) + + print('residual', normal_predict_error_file / normal_predict_amount_file * 100) + normal_predict_amount += normal_predict_amount_file + normal_predict_error += normal_predict_error_file + + significance = np.mean(folder_sigs) + sigs.extend(folder_sigs) + + print('mean significant', significance, '\n') + +residual = normal_predict_error / normal_predict_amount +print('total residual', residual * 100) + +print('total significance', np.mean(sigs)) + +print('num_files (for A & D)', num_files) + diff --git a/utils/element_scores.py b/utils/element_scores.py new file mode 100644 index 0000000..ed6e3e5 --- /dev/null +++ b/utils/element_scores.py @@ -0,0 +1,65 @@ +import numpy as np + + +def add_explanatory_power(df, derived): + """ + Computes the explanatory power for all elements in the dataframe. + :param df: pandas dataframe. + :param derived: boolean, if derived measures are used. + :return: pandas dataframe with added column for the explanatory power. + """ + if derived: + F_a = df['predict_a'].sum() + F_b = df['predict_b'].sum() + + n = (df['real_a'] - df['predict_a']) * F_b - (df['real_b'] - df['predict_b']) * F_a + d = F_b * (F_b + df['real_b'] - df['predict_b']) + df['ep'] = n / d + + # Normalize to sum up to 1 + df['ep'] = df['ep'] / df['ep'].sum() + else: + F = df['predict'].sum() + A = df['real'].sum() + + df['ep'] = (df['real'] - df['predict']) / (A - F) + return df + + +def add_surpise(df, derived, merged_divide=1): + """ + Computes the surprise for all elements in the dataframe. + :param df: pandas dataframe. + :param derived: boolean, if derived measures are used. + :param merged_divide: int, if the total sum should be divided + (this is true if the dataframe elements have been merged over the dimensions + as done in the adtributor code). + :return: dataframe with added column for the surprise. + """ + def compute_surprise(col_real, col_predict): + with np.errstate(divide='ignore'): + F = df[col_predict].sum() / merged_divide + A = df[col_real].sum() / merged_divide + + p = df[col_predict] / F + q = df[col_real] / A + p_term = np.nan_to_num(p * np.log(2 * p / (p + q))) + q_term = np.nan_to_num(q * np.log(2 * q / (p + q))) + surprise = 0.5 * (p_term + q_term) + return surprise + + if derived: + df['surprise'] = compute_surprise('real_a', 'predict_a') + compute_surprise('real_b', 'predict_b') + else: + df['surprise'] = compute_surprise('real', 'predict') + return df + + +def add_deviation_score(df): + """ + Computes the deviation score for all elements in the dataframe. + :param df: pandas dataframe. + :return: dataframe with added column for the deviation score. + """ + df['deviation'] = 2 * (df['predict'] - df['real']).divide(df['predict'] + df['real']).fillna(0.0) + return df diff --git a/utils/evaluation.py b/utils/evaluation.py new file mode 100644 index 0000000..67abb67 --- /dev/null +++ b/utils/evaluation.py @@ -0,0 +1,55 @@ +import numpy as np + + +def score_root_causes(root_cause_predictions, label): + """ + Evaluates an instances given the prediction and label. + :param root_cause_predictions: list of strings, the root cause predictions. + :param label: str, the ground-truth label. + :return: (float, float, float, list of strings), true positive count, false positive count, false negative count, + and the ground-truth labels as a list. + """ + true_labels = label.split(';') + true_labels = ['&'.join(sorted(tl.split('&'))) for tl in true_labels] + true_labels = np.unique(true_labels) + + # Return early if pred_labels is empty to avoid FutureWarning + if len(root_cause_predictions) == 0: + return 0, 0, len(true_labels), true_labels + + TP, FN = 0, 0 + for true_label in true_labels: + if true_label in root_cause_predictions: + TP += 1 + else: + FN += 1 + + FP = max(len(root_cause_predictions) - TP, 0) + return TP, FP, FN, true_labels + + +def root_cause_postprocessing(root_causes, algorithm): + """ + Postprocessing for the returned root causes for unified evaluation as the return from + adtributor and squeeze are slightly different. + :param root_causes: list of string, all predicted root causes. + :param algorithm: str, the algorithm used. + :return: list of strings, the final root cause predictions in a unified format. + """ + # To make Adtibutor match other algorithms' outputs. + if algorithm == 'adtributor': + for rc in root_causes: + rc['elements'] = [[e] for e in rc['elements']] + rc['cuboid'] = [rc['dimension']] + + # To get strings (added here for uniformity since squeeze returns strings). + if algorithm != 'squeeze': + root_cause_predictions = [] + for rc in root_causes: + elems = np.array([d + '=' for d in rc['cuboid']], dtype=object) + np.array(rc['elements'], dtype=object) + root_cause_predictions.extend(['&'.join(e) for e in elems]) + else: + root_cause_predictions = root_causes + + root_cause_predictions = np.unique(root_cause_predictions) + return root_cause_predictions \ No newline at end of file diff --git a/utils/run_utils.py b/utils/run_utils.py new file mode 100644 index 0000000..33b26d9 --- /dev/null +++ b/utils/run_utils.py @@ -0,0 +1,141 @@ +import os +import pandas as pd +import numpy as np +from algorithms.hotspot import hotspot +from algorithms.squeeze.squeeze import Squeeze, SqueezeOption +from algorithms.autoroot import autoroot +from algorithms.adtributor import adtributor +from algorithms.riskloc import riskloc +from algorithms.rev_rec_adtributor import rev_rec_adtributor + + +def run_method(df, dfs, attributes, algorithm, algorithm_args, derived, debug): + """ + Runs the specified algorithm on a given instance. + :param df: pandas dataframe, the data to use. + :param dfs: list of pandas dataframe, input for squeeze when using derived data. + :param attributes: list, the attributes. + :param algorithm: str, name of the algorithm. + :param algorithm_args: dict, algorithm input parameters. + :param derived: boolean, if using derived data. + :param debug: boolean, if running in debug mode. + :return: list, root cause predictions. + """ + if algorithm == "riskloc": + root_causes = riskloc(df, attributes, derived=derived, debug=debug, **algorithm_args) + elif algorithm == 'autoroot': + root_causes = autoroot(df, attributes, debug=debug, **algorithm_args) + elif algorithm == "squeeze": + if not derived: + model = Squeeze( + data_list=[df], + op=lambda x: x, + option=SqueezeOption(debug=debug, **algorithm_args) + ) + else: + divide = lambda x, y: np.divide(x, y, out=np.zeros_like(x), where=y != 0) + model = Squeeze( + data_list=dfs, + op=divide, + option=SqueezeOption(debug=debug, **algorithm_args) + ) + model.run() + root_causes = model.root_cause_string_list + elif algorithm == "hotspot": + root_causes = [hotspot(df, attributes, debug=debug, **algorithm_args)] + elif algorithm == "adtributor": + root_causes = adtributor(df, attributes, derived=derived, **algorithm_args) + elif algorithm == "r_adtributor": + root_causes = rev_rec_adtributor(df, attributes, derived=derived, **algorithm_args) + else: + raise ValueError("method", algorithm, "not implemented.") + return root_causes + + +def read_dataframe(directory, file, derived): + """ + Reads a root cause example file. + :param directory: str, teh directory with files. + :param file: str, the csv file to use (note: without appending .csv). + :param derived: boolean, if the dataset is a derived measure. + :return: pandas dataframe, attributes, if derived: non-merged dataframes (used for squeeze). + """ + def get_attributes(df): + return sorted(df.columns.drop(['real', 'predict']).tolist()) + + if derived: + file_a = file + '.a.csv' + file_b = file + '.b.csv' + df_a = pd.read_csv(os.path.join(directory, file_a)) + df_b = pd.read_csv(os.path.join(directory, file_b)) + + attributes = get_attributes(df_a) + + df = pd.merge(df_a, df_b, on=attributes, suffixes=('_a', '_b')) + df['real'] = df['real_a'] / df['real_b'] + df['predict'] = df['predict_a'] / df['predict_b'] + df = df.fillna(0.0) # Fix any nans that appeared after dividing by 0. + else: + df = pd.read_csv(os.path.join(directory, file + '.csv')) + attributes = get_attributes(df) + df_a = df_b = None + + return df, attributes, df_a, df_b + + +def get_instances(data_root, directory): + """ + Obtains all instances (files) to be run within a folder. + :param: data_root: str, the root directory for the datasets. + :param directory: str, the directory to be run. + :return: list with tuples containing all files to be run. + """ + path = os.path.join(data_root, directory) + subdirs = [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))] + + if len(subdirs) > 0: + # if there are any directories then we want to recursively go deeper + subdir_insts = [get_instances(data_root, os.path.join(directory, subdir)) for subdir in subdirs] + instances = [inst for subdir_inst in subdir_insts for inst in subdir_inst] + else: + # deepest level, get all files + dir_split = directory.split(os.sep) + dataset = dir_split[0] + subdir = os.path.join(*dir_split[1:]) if len(dir_split) > 1 else '' + + instances = [] + for file in os.listdir(path): + # ignore the file with labels + if os.path.isfile(os.path.join(path, file)) and file != 'injection_info.csv': + instance = (dataset, subdir, file.split(".")[0]) + instances.append(instance) + + # the D folder contains 2 files for each timestamp, so only keep unique ones. + instances = list(set(instances)) + return instances + + +def result_post_processing(parallel_run_results, algorithm, csv_suffix): + """ + Postprocess of the results and saving result csv files. + :param parallel_run_results: algorithm results. + """ + df = pd.DataFrame(parallel_run_results) + df.columns = ['Dataset', 'Folder', 'File', 'F1', 'TP', 'FP', 'FN', 'Time'] + df = df.sort_values(['Dataset', 'Folder']) + + df_summary = df.copy() + A_folder_split = 'layer_' + df_summary['Folder'].str.split('_').str[-1] + \ + '_elements_' + df_summary['Folder'].str.split('_').str[-3] + df_summary['Folder'] = np.where(df_summary['Dataset'] == 'A', A_folder_split, df_summary['Folder']) + + df_summary = df_summary.groupby(['Dataset', 'Folder'], as_index=False). \ + agg({'TP': sum, 'FP': sum, 'FN': sum, 'Time': sum}) + df_summary = df_summary.sort_values(['Dataset', 'Folder']) + df_summary['F1-score'] = 2 * df_summary['TP'] / (2 * df_summary['TP'] + df_summary['FP'] + df_summary['FN']) + + with pd.option_context('display.max_rows', None, 'display.max_columns', None): + print(df_summary) + + df.to_csv(algorithm + '-all' + csv_suffix + '.csv', index=False) + df_summary.to_csv(algorithm + '-summary' + csv_suffix + '.csv', index=False) \ No newline at end of file