diff --git a/.github/workflows/run-forecast-unit-tests.yml b/.github/workflows/run-forecast-unit-tests.yml index 1501862f5..7826c5b7c 100644 --- a/.github/workflows/run-forecast-unit-tests.yml +++ b/.github/workflows/run-forecast-unit-tests.yml @@ -27,7 +27,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10"] + python-version: ["3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/run-operators-unit-tests.yml b/.github/workflows/run-operators-unit-tests.yml index d0fe4d9c7..239ee56c5 100644 --- a/.github/workflows/run-operators-unit-tests.yml +++ b/.github/workflows/run-operators-unit-tests.yml @@ -27,7 +27,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v4 diff --git a/THIRD_PARTY_LICENSES.txt b/THIRD_PARTY_LICENSES.txt index f80ce825a..809335e3d 100644 --- a/THIRD_PARTY_LICENSES.txt +++ b/THIRD_PARTY_LICENSES.txt @@ -483,6 +483,11 @@ rrcf * Source code: https://github.com/kLabUM/rrcf * Project home: https://github.com/kLabUM/rrcf +Merlion +* Copyright 2021 Salesforce.com Inc +* License: BSD-3 Clause License +* Source code: https://github.com/salesforce/Merlion +* Project Home: https://github.com/salesforce/Merlion =============================== Licenses =============================== ------------------------------------------------------------------------ diff --git a/ads/opctl/operator/lowcode/anomaly/const.py b/ads/opctl/operator/lowcode/anomaly/const.py index f5ed3df57..ea2bb7509 100644 --- a/ads/opctl/operator/lowcode/anomaly/const.py +++ b/ads/opctl/operator/lowcode/anomaly/const.py @@ -21,6 +21,23 @@ class SupportedModels(str, metaclass=ExtendedEnumMeta): EE = "ee" ISOLATIONFOREST = "isolationforest" + # point anomaly + DAGMM = "dagmm" + DEEP_POINT_ANOMALY_DETECTOR = "deep_point_anomaly_detector" + LSTM_ED = "lstm_ed" + SPECTRAL_RESIDUAL = "spectral_residual" + VAE = "vae" + + # forecast_based + ARIMA = "arima" + ETS = "ets" + PROPHET = "prophet" + SARIMA = "sarima" + + # changepoint + BOCPD = "bocpd" + + class NonTimeADSupportedModels(str, metaclass=ExtendedEnumMeta): """Supported non time-based anomaly detection models.""" @@ -29,7 +46,7 @@ class NonTimeADSupportedModels(str, metaclass=ExtendedEnumMeta): RandomCutForest = "randomcutforest" # TODO : Add DBScan # DBScan = "dbscan" - + class TODSSubModels(str, metaclass=ExtendedEnumMeta): """Supported TODS sub models.""" @@ -61,6 +78,54 @@ class TODSSubModels(str, metaclass=ExtendedEnumMeta): } +class MerlionADModels(str, metaclass=ExtendedEnumMeta): + """Supported Merlion AD sub models.""" + + # point anomaly + DAGMM = "dagmm" + DEEP_POINT_ANOMALY_DETECTOR = "deep_point_anomaly_detector" + LSTM_ED = "lstm_ed" + SPECTRAL_RESIDUAL = "spectral_residual" + VAE = "vae" + + # forecast_based + ARIMA = "arima" + ETS = "ets" + PROPHET = "prophet" + SARIMA = "sarima" + + # changepoint + BOCPD = "bocpd" + + +MERLIONAD_IMPORT_MODEL_MAP = { + MerlionADModels.DAGMM: ".dagmm", + MerlionADModels.DEEP_POINT_ANOMALY_DETECTOR: ".deep_point_anomaly_detector", + MerlionADModels.LSTM_ED: ".lstm_ed", + MerlionADModels.SPECTRAL_RESIDUAL: ".spectral_residual", + MerlionADModels.VAE: ".vae", + MerlionADModels.ARIMA: ".forecast_based.arima", + MerlionADModels.ETS: ".forecast_based.ets", + MerlionADModels.PROPHET: ".forecast_based.prophet", + MerlionADModels.SARIMA: ".forecast_based.sarima", + MerlionADModels.BOCPD: ".change_point.bocpd", +} + + +MERLIONAD_MODEL_MAP = { + MerlionADModels.DAGMM: "DAGMM", + MerlionADModels.DEEP_POINT_ANOMALY_DETECTOR: "DeepPointAnomalyDetector", + MerlionADModels.LSTM_ED: "LSTMED", + MerlionADModels.SPECTRAL_RESIDUAL: "SpectralResidual", + MerlionADModels.VAE: "VAE", + MerlionADModels.ARIMA: "ArimaDetector", + MerlionADModels.ETS: "ETSDetector", + MerlionADModels.PROPHET: "ProphetDetector", + MerlionADModels.SARIMA: "SarimaDetector", + MerlionADModels.BOCPD: "BOCPD", +} + + class SupportedMetrics(str, metaclass=ExtendedEnumMeta): UNSUPERVISED_UNIFY95 = "unsupervised_unify95" UNSUPERVISED_UNIFY95_LOG_LOSS = "unsupervised_unify95_log_loss" diff --git a/ads/opctl/operator/lowcode/anomaly/model/anomaly_merlion.py b/ads/opctl/operator/lowcode/anomaly/model/anomaly_merlion.py new file mode 100644 index 000000000..cc1e80b52 --- /dev/null +++ b/ads/opctl/operator/lowcode/anomaly/model/anomaly_merlion.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python + +# Copyright (c) 2023, 2024 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +import importlib + +import numpy as np +import pandas as pd +from merlion.post_process.threshold import AggregateAlarms +from merlion.utils import TimeSeries + +from ads.common.decorator.runtime_dependency import runtime_dependency +from ads.opctl.operator.lowcode.anomaly.const import ( + MERLIONAD_IMPORT_MODEL_MAP, + MERLIONAD_MODEL_MAP, + OutputColumns, + SupportedModels, +) + +from .anomaly_dataset import AnomalyOutput +from .base_model import AnomalyOperatorBaseModel + + +class AnomalyMerlionOperatorModel(AnomalyOperatorBaseModel): + """Class representing Merlion Anomaly Detection operator model.""" + + @runtime_dependency( + module="merlion", + err_msg=( + "Please run `pip3 install salesforce-merlion[all]` to " + "install the required packages." + ), + ) + def _get_config_model(self, model_name): + """ + Returns a dictionary with model names as keys and a list of model config and model object as values. + + Parameters + ---------- + model_name : str + model name from the Merlion model list. + + Returns + ------- + dict + A dictionary with model names as keys and a list of model config and model object as values. + """ + model_config_map = {} + model_module = importlib.import_module( + name=MERLIONAD_IMPORT_MODEL_MAP.get(model_name), + package="merlion.models.anomaly", + ) + model_config = getattr( + model_module, MERLIONAD_MODEL_MAP.get(model_name) + "Config" + ) + model = getattr(model_module, MERLIONAD_MODEL_MAP.get(model_name)) + model_config_map[model_name] = [model_config, model] + return model_config_map + + def _build_model(self) -> AnomalyOutput: + """ + Builds a Merlion anomaly detection model and trains it using the given data. + + Parameters + ---------- + None + + Returns + ------- + AnomalyOutput + An AnomalyOutput object containing the anomaly detection results. + """ + model_kwargs = self.spec.model_kwargs + anomaly_output = AnomalyOutput(date_column="index") + anomaly_threshold = model_kwargs.get("anomaly_threshold", 95) + model_config_map = {} + model_config_map = self._get_config_model(self.spec.model) + + date_column = self.spec.datetime_column.name + + anomaly_output = AnomalyOutput(date_column=date_column) + # model_objects = defaultdict(list) + for target, df in self.datasets.full_data_dict.items(): + data = df.set_index(date_column) + data = TimeSeries.from_pd(data) + for model_name, (model_config, model) in model_config_map.items(): + if self.spec.model == SupportedModels.BOCPD: + model_config = model_config(**self.spec.model_kwargs) + else: + model_config = model_config( + **{ + **self.spec.model_kwargs, + "threshold": AggregateAlarms( + alm_threshold=model_kwargs.get("alm_threshold") + if model_kwargs.get("alm_threshold") + else None + ), + } + ) + if hasattr(model_config, "target_seq_index"): + model_config.target_seq_index = df.columns.get_loc( + self.spec.target_column + ) + model = model(model_config) + + scores = model.train(train_data=data, anomaly_labels=None) + scores = scores.to_pd().reset_index() + scores["anom_score"] = ( + scores["anom_score"] - scores["anom_score"].min() + ) / (scores["anom_score"].max() - scores["anom_score"].min()) + + try: + y_pred = model.get_anomaly_label(data) + y_pred = (y_pred.to_pd().reset_index()["anom_score"] > 0).astype( + int + ) + except Exception as e: + y_pred = ( + scores["anom_score"] + > np.percentile( + scores["anom_score"], + anomaly_threshold, + ) + ).astype(int) + + index_col = df.columns[0] + + anomaly = pd.DataFrame( + {index_col: df[index_col], OutputColumns.ANOMALY_COL: y_pred} + ).reset_index(drop=True) + score = pd.DataFrame( + { + index_col: df[index_col], + OutputColumns.SCORE_COL: scores["anom_score"], + } + ).reset_index(drop=True) + # model_objects[model_name].append(model) + + anomaly_output.add_output(target, anomaly, score) + return anomaly_output + + def _generate_report(self): + """Genreates a report for the model.""" + import report_creator as rc + + other_sections = [ + rc.Heading("Selected Models Overview", level=2), + rc.Text( + "The following tables provide information regarding the chosen model." + ), + ] + + model_description = rc.Text( + "The Merlion anomaly detection model is a full-stack automated machine learning system for anomaly detection." + ) + + return ( + model_description, + other_sections, + ) diff --git a/ads/opctl/operator/lowcode/anomaly/model/autots.py b/ads/opctl/operator/lowcode/anomaly/model/autots.py index 724aa2cae..c795440de 100644 --- a/ads/opctl/operator/lowcode/anomaly/model/autots.py +++ b/ads/opctl/operator/lowcode/anomaly/model/autots.py @@ -5,15 +5,17 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ from ads.common.decorator.runtime_dependency import runtime_dependency +from ads.opctl import logger from ads.opctl.operator.lowcode.anomaly.const import OutputColumns + +from ..const import SupportedModels from .anomaly_dataset import AnomalyOutput from .base_model import AnomalyOperatorBaseModel -from ..const import SupportedModels -from ads.opctl import logger class AutoTSOperatorModel(AnomalyOperatorBaseModel): """Class representing AutoTS Anomaly Detection operator model.""" + model_mapping = { "isolationforest": "IsolationForest", "lof": "LOF", @@ -22,30 +24,43 @@ class AutoTSOperatorModel(AnomalyOperatorBaseModel): "rolling_zscore": "rolling_zscore", "mad": "mad", "minmax": "minmax", - "iqr": "IQR" + "iqr": "IQR", } @runtime_dependency( module="autots", err_msg=( - "Please run `pip3 install autots` to " - "install the required dependencies for AutoTS." + "Please run `pip3 install autots` to " + "install the required dependencies for AutoTS." ), ) def _build_model(self) -> AnomalyOutput: from autots.evaluator.anomaly_detector import AnomalyDetector - method = SupportedModels.ISOLATIONFOREST if self.spec.model == SupportedModels.AutoTS else self.spec.model - model_params = {"method": self.model_mapping[method], - "transform_dict": self.spec.model_kwargs.get("transform_dict", {}), - "output": self.spec.model_kwargs.get("output", "univariate"), "method_params": {}} + method = ( + SupportedModels.ISOLATIONFOREST + if self.spec.model == SupportedModels.AutoTS + else self.spec.model + ) + model_params = { + "method": self.model_mapping[method], + "transform_dict": self.spec.model_kwargs.get("transform_dict", {}), + "output": self.spec.model_kwargs.get("output", "univariate"), + "method_params": {}, + } # Supported methods with contamination param - if method in [SupportedModels.ISOLATIONFOREST, SupportedModels.LOF, SupportedModels.EE]: - model_params["method_params"][ - "contamination"] = self.spec.contamination if self.spec.contamination else 0.01 - else: - if self.spec.contamination: - raise ValueError(f"The contamination parameter is not supported for the selected model \"{method}\"") + if method in [ + SupportedModels.ISOLATIONFOREST, + SupportedModels.LOF, + SupportedModels.EE, + ]: + model_params["method_params"]["contamination"] = ( + self.spec.contamination if self.spec.contamination else 0.01 + ) + elif self.spec.contamination: + raise ValueError( + f'The contamination parameter is not supported for the selected model "{method}"' + ) logger.info(f"model params: {model_params}") model = AnomalyDetector(**model_params) diff --git a/ads/opctl/operator/lowcode/anomaly/model/factory.py b/ads/opctl/operator/lowcode/anomaly/model/factory.py index 436cdcf73..10df5733c 100644 --- a/ads/opctl/operator/lowcode/anomaly/model/factory.py +++ b/ads/opctl/operator/lowcode/anomaly/model/factory.py @@ -4,14 +4,16 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ from ads.opctl.operator.lowcode.anomaly.utils import select_auto_model + +from ..const import NonTimeADSupportedModels, SupportedModels +from ..operator_config import AnomalyOperatorConfig from .anomaly_dataset import AnomalyDatasets +from .anomaly_merlion import AnomalyMerlionOperatorModel from .autots import AutoTSOperatorModel from .base_model import AnomalyOperatorBaseModel from .isolationforest import IsolationForestOperatorModel from .oneclasssvm import OneClassSVMOperatorModel from .randomcutforest import RandomCutForestOperatorModel -from ..const import NonTimeADSupportedModels, SupportedModels -from ..operator_config import AnomalyOperatorConfig class UnSupportedModelError(Exception): @@ -48,7 +50,17 @@ class AnomalyOperatorModelFactory: SupportedModels.ZSCORE: AutoTSOperatorModel, SupportedModels.ROLLING_ZSCORE: AutoTSOperatorModel, SupportedModels.EE: AutoTSOperatorModel, - SupportedModels.MAD: AutoTSOperatorModel + SupportedModels.MAD: AutoTSOperatorModel, + SupportedModels.DAGMM: AnomalyMerlionOperatorModel, + SupportedModels.DEEP_POINT_ANOMALY_DETECTOR: AnomalyMerlionOperatorModel, + SupportedModels.LSTM_ED: AnomalyMerlionOperatorModel, + SupportedModels.SPECTRAL_RESIDUAL: AnomalyMerlionOperatorModel, + SupportedModels.VAE: AnomalyMerlionOperatorModel, + SupportedModels.ARIMA: AnomalyMerlionOperatorModel, + SupportedModels.ETS: AnomalyMerlionOperatorModel, + SupportedModels.PROPHET: AnomalyMerlionOperatorModel, + SupportedModels.SARIMA: AnomalyMerlionOperatorModel, + SupportedModels.BOCPD: AnomalyMerlionOperatorModel, } _NonTime_MAP = { diff --git a/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py b/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py index e2b8b9d5a..17f19351d 100644 --- a/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py +++ b/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py @@ -36,7 +36,7 @@ def _build_model(self) -> AnomalyOutput: # Set tree parameters num_trees = model_kwargs.get("num_trees", 200) shingle_size = model_kwargs.get("shingle_size", None) - anomaly_threshold = model_kwargs.get("anamoly_threshold", 95) + anomaly_threshold = model_kwargs.get("anomaly_threshold", 95) for target, df in self.datasets.full_data_dict.items(): try: diff --git a/ads/opctl/operator/lowcode/anomaly/schema.yaml b/ads/opctl/operator/lowcode/anomaly/schema.yaml index 31cb8afaa..aba6c4e82 100644 --- a/ads/opctl/operator/lowcode/anomaly/schema.yaml +++ b/ads/opctl/operator/lowcode/anomaly/schema.yaml @@ -370,6 +370,16 @@ spec: - rolling_zscore - mad - ee + - dagmm + - deep_point_anomaly_detector + - lstm_ed + - spectral_residual + - vae + - arima + - ets + - sarima + - bocpd + - prophet meta: description: "The model to be used for anomaly detection" diff --git a/ads/opctl/operator/lowcode/anomaly/utils.py b/ads/opctl/operator/lowcode/anomaly/utils.py index d93b69810..902e7f186 100644 --- a/ads/opctl/operator/lowcode/anomaly/utils.py +++ b/ads/opctl/operator/lowcode/anomaly/utils.py @@ -5,6 +5,7 @@ import os +import numpy as np import pandas as pd from ads.opctl import logger @@ -27,6 +28,8 @@ def _build_metrics_df(y_true, y_pred, column_name): ) metrics = {} + np.nan_to_num(y_true, copy=False) + np.nan_to_num(y_pred, copy=False) metrics[SupportedMetrics.RECALL] = recall_score(y_true, y_pred) metrics[SupportedMetrics.PRECISION] = precision_score(y_true, y_pred) metrics[SupportedMetrics.ACCURACY] = accuracy_score(y_true, y_pred) diff --git a/pyproject.toml b/pyproject.toml index d1bd85131..45c7c9232 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -179,7 +179,8 @@ anomaly = [ "oracledb", "report-creator==1.0.9", "rrcf==0.4.4", - "scikit-learn" + "scikit-learn", + "salesforce-merlion[all]==2.0.4" ] recommender = [ "oracle_ads[opctl]", diff --git a/tests/operators/anomaly/test_anomaly_simple.py b/tests/operators/anomaly/test_anomaly_simple.py index ddb6b35c2..658d292a5 100644 --- a/tests/operators/anomaly/test_anomaly_simple.py +++ b/tests/operators/anomaly/test_anomaly_simple.py @@ -3,20 +3,44 @@ # Copyright (c) 2023, 2024 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -from ads.opctl.operator.lowcode.anomaly.const import NonTimeADSupportedModels, SupportedModels -import yaml +import os import subprocess -import pandas as pd -import pytest -from time import sleep -from copy import deepcopy import tempfile -import os -import numpy as np +from copy import deepcopy from datetime import datetime -from ads.opctl.operator.cmd import run +from time import sleep -MODELS = ["autots", "iqr", "lof", "zscore", "rolling_zscore", "mad", "ee", "isolationforest"] +import numpy as np +import pandas as pd +import pytest +import yaml + +from ads.opctl.operator.cmd import run +from ads.opctl.operator.lowcode.anomaly.const import ( + NonTimeADSupportedModels, + SupportedModels, +) + +MODELS = [ + "autots", + "iqr", + "lof", + "zscore", + "rolling_zscore", + "mad", + "ee", + "isolationforest", + "dagmm", + "deep_point_anomaly_detector", + "lstm_ed", + "spectral_residual", + "vae", + "arima", + "ets", + "prophet", + "sarima", + "bocpd", +] # Mandatory YAML parameters TEMPLATE_YAML = { @@ -51,7 +75,26 @@ for d in DATASETS: parameters_short.append((m, d)) -MODELS = ["autots", "oneclasssvm", "isolationforest", "randomcutforest"] +# "autoencoder", "stat_residual", "mses",, "dbl", +# "windstats", "windstats_monthly", "zms", + +MODELS = [ + "autots", + "oneclasssvm", + "isolationforest", + "randomcutforest", + "dagmm", + "deep_point_anomaly_detector", + "lstm_ed", + "spectral_residual", + "vae", + "arima", + "ets", + "prophet", + "sarima", + "bocpd", +] + @pytest.mark.parametrize("model", ["autots"]) def test_artificial_big(model): @@ -124,6 +167,10 @@ def test_artificial_small(model): np.concatenate([d1, d2, outliers], axis=0), columns=["val_1", "val_2"] ) d = d.reset_index().rename({"index": "ds"}, axis=1) + if model not in NonTimeADSupportedModels.values(): + d["ds"] = pd.date_range( + datetime.today(), periods=d.shape[0], freq="1D" + ).strftime("%Y-%m-%d") with tempfile.TemporaryDirectory() as tmpdirname: anomaly_yaml_filename = f"{tmpdirname}/anomaly.yaml" input_data = f"{tmpdirname}/data.csv" @@ -138,6 +185,7 @@ def test_artificial_small(model): yaml_i["spec"]["contamination"] = 0.3 if model in NonTimeADSupportedModels.values(): del yaml_i["spec"]["datetime_column"] + yaml_i["spec"]["target_column"] = "val_1" # run(yaml_i, debug=False) @@ -171,6 +219,9 @@ def test_validation(model): ) if model not in NonTimeADSupportedModels.values(): d = d.reset_index().rename({"index": "ds"}, axis=1) + d["ds"] = pd.date_range( + datetime.today(), periods=d.shape[0], freq="1D" + ).strftime("%Y-%m-%d") anomaly_col["ds"] = d["ds"] v = d.copy() v["anomaly"] = anomaly_col["anomaly"] @@ -217,11 +268,14 @@ def test_load_datasets(model, data_dict): yaml_i = deepcopy(TEMPLATE_YAML) yaml_i["spec"]["model"] = model yaml_i["spec"]["input_data"]["url"] = data_dict["url"] - if model in set(NonTimeADSupportedModels.values()) - set(SupportedModels.values()): + if model in set(NonTimeADSupportedModels.values()) - set( + SupportedModels.values() + ): del yaml_i["spec"]["datetime_column"] else: yaml_i["spec"]["datetime_column"]["name"] = data_dict["dt_col"] yaml_i["spec"]["output_directory"]["url"] = output_dirname + yaml_i["spec"]["target_column"] = data_dict["target"] # run(yaml_i, backend="operator.local", debug=False)