From 76b75417e46fc6296d532700c4b3c5708bd1cdd2 Mon Sep 17 00:00:00 2001 From: govarsha Date: Tue, 7 Nov 2023 16:17:11 +0530 Subject: [PATCH] changes for ODSC-49565 --- .../lowcode/forecast/model/base_model.py | 38 +++++---- .../forecast/model/forecast_datasets.py | 6 +- ads/opctl/operator/lowcode/forecast/utils.py | 84 +++++++++---------- 3 files changed, 63 insertions(+), 65 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 0e5e5a042..f3db137c2 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -25,6 +25,7 @@ from ads.common.decorator.runtime_dependency import runtime_dependency from .forecast_datasets import ForecastDatasets + class ForecastOperatorBaseModel(ABC): """The base class for the forecast operator models.""" @@ -350,24 +351,25 @@ def _test_evaluate_metrics( horizon_periods=self.spec.horizon, ) - summary_metrics = summary_metrics.append(metrics_per_horizon) - - new_column_order = [ - SupportedMetrics.MEAN_SMAPE, - SupportedMetrics.MEDIAN_SMAPE, - SupportedMetrics.MEAN_MAPE, - SupportedMetrics.MEDIAN_MAPE, - SupportedMetrics.MEAN_WMAPE, - SupportedMetrics.MEDIAN_WMAPE, - SupportedMetrics.MEAN_RMSE, - SupportedMetrics.MEDIAN_RMSE, - SupportedMetrics.MEAN_R2, - SupportedMetrics.MEDIAN_R2, - SupportedMetrics.MEAN_EXPLAINED_VARIANCE, - SupportedMetrics.MEDIAN_EXPLAINED_VARIANCE, - SupportedMetrics.ELAPSED_TIME, - ] - summary_metrics = summary_metrics[new_column_order] + if not metrics_per_horizon.empty: + summary_metrics = summary_metrics.append(metrics_per_horizon) + + new_column_order = [ + SupportedMetrics.MEAN_SMAPE, + SupportedMetrics.MEDIAN_SMAPE, + SupportedMetrics.MEAN_MAPE, + SupportedMetrics.MEDIAN_MAPE, + SupportedMetrics.MEAN_WMAPE, + SupportedMetrics.MEDIAN_WMAPE, + SupportedMetrics.MEAN_RMSE, + SupportedMetrics.MEDIAN_RMSE, + SupportedMetrics.MEAN_R2, + SupportedMetrics.MEDIAN_R2, + SupportedMetrics.MEAN_EXPLAINED_VARIANCE, + SupportedMetrics.MEDIAN_EXPLAINED_VARIANCE, + SupportedMetrics.ELAPSED_TIME, + ] + summary_metrics = summary_metrics[new_column_order] return total_metrics, summary_metrics, data diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index d528c4370..fd4addfa8 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -57,9 +57,9 @@ def _load_data(self, spec): storage_options=default_signer(), columns=spec.additional_data.columns, ) - additional_data = Transformations( - raw_data, self.spec - )._sort_by_datetime_col(additional_data) + additional_data = Transformations(raw_data, spec)._sort_by_datetime_col( + additional_data + ) self.original_additional_data = additional_data.copy() self.original_total_data = pd.concat([data, additional_data], axis=1) diff --git a/ads/opctl/operator/lowcode/forecast/utils.py b/ads/opctl/operator/lowcode/forecast/utils.py index 9b460c5c7..a36dc552b 100644 --- a/ads/opctl/operator/lowcode/forecast/utils.py +++ b/ads/opctl/operator/lowcode/forecast/utils.py @@ -24,9 +24,11 @@ from ads.dataset.label_encoder import DataFrameLabelEncoder from ads.opctl import logger -from .const import SupportedMetrics, SupportedModels +from .const import MAX_COLUMNS_AUTOMLX, SupportedMetrics, SupportedModels from .errors import ForecastInputDataError, ForecastSchemaYamlError -from .operator_config import ForecastOperatorSpec, ForecastOperatorConfig +import re +from .operator_config import ForecastOperatorSpec +from .const import SupportedModels def _label_encode_dataframe(df, no_encode=set()): @@ -77,13 +79,36 @@ def _build_metrics_per_horizon( Pandas Dataframe Dataframe with Mean sMAPE, Median sMAPE, Mean MAPE, Median MAPE, Mean wMAPE, Median wMAPE values for each horizon """ - actuals_df = data[target_columns] + """ + Assumptions: + data and outputs have all the target columns. + yhats in outputs are in the same order as in target_columns. + Test data might not have sorted dates and the order of series also might differ. + """ + + # Select the data with correct order of target_columns. + actuals_df = data[["ds"] + target_columns] + + # Concat the yhats in outputs and include only dates that are in test data forecasts_df = pd.concat( - [df[target_col].iloc[-horizon_periods:] for df in outputs], axis=1 + [ + (df[df["ds"].isin(actuals_df["ds"])][["ds", target_col]]).set_index("ds") + for df in outputs + ], + axis=1, ) + + # Remove dates that are not there in outputs + actuals_df = actuals_df[actuals_df["ds"].isin(forecasts_df.index.values)] + + if actuals_df.empty or forecasts_df.empty: + return pd.DataFrame() + totals = actuals_df.sum() wmape_weights = np.array((totals / totals.sum()).values) + actuals_df = actuals_df.set_index("ds") + metrics_df = pd.DataFrame( columns=[ SupportedMetrics.MEAN_SMAPE, @@ -121,12 +146,9 @@ def _build_metrics_per_horizon( } metrics_df = pd.concat( - [metrics_df, pd.DataFrame(metrics_row, index=[data["ds"][i]])], - ignore_index=True, + [metrics_df, pd.DataFrame(metrics_row, index=[actuals_df.index[i]])], ) - metrics_df.set_index(data["ds"], inplace=True) - return metrics_df @@ -284,9 +306,6 @@ def _build_indexed_datasets( data["__Series__"] = _merge_category_columns(data, target_category_columns) unique_categories = data["__Series__"].unique() invalid_categories = [] - if additional_data is not None and target_column in additional_data.columns: - logger.warn(f"Dropping column '{target_column}' from additional_data") - additional_data.drop(target_column, axis=1, inplace=True) for cat in unique_categories: data_by_cat = data[data["__Series__"] == cat].rename( {target_column: f"{target_column}_{cat}"}, axis=1 @@ -341,13 +360,12 @@ def _build_metrics_df(y_true, y_pred, column_name): return pd.DataFrame.from_dict(metrics, orient="index", columns=[column_name]) -def evaluate_metrics(target_columns, data, outputs, datetime_col, target_col="yhat"): +def evaluate_metrics(target_columns, data, outputs, target_col="yhat"): total_metrics = pd.DataFrame() for idx, col in enumerate(target_columns): try: - dates = np.intersect1d(data[datetime_col], outputs[idx]["ds"]) - y_true = np.asarray(data[col][data[datetime_col].isin(dates)]) - y_pred = outputs[idx][outputs[idx]["ds"].isin(dates)][target_col] + y_true = np.asarray(data[col]) + y_pred = np.asarray(outputs[idx][target_col][: len(y_true)]) metrics_df = _build_metrics_df( y_true=y_true, y_pred=y_pred, column_name=col @@ -467,9 +485,7 @@ def human_time_friendly(seconds): return ", ".join(accumulator) -def select_auto_model( - datasets: "ForecastDatasets", operator_config: ForecastOperatorConfig -) -> str: +def select_auto_model(columns: List[str]) -> str: """ Selects AutoMLX or Arima model based on column count. @@ -478,35 +494,17 @@ def select_auto_model( Parameters ------------ - datasets: ForecastDatasets - Datasets for predictions + columns: List + The list of columns. Returns -------- str The type of the model. """ - date_column = operator_config.spec.datetime_column.name - datetimes = pd.to_datetime(datasets.original_user_data[date_column].drop_duplicates()) - freq_in_secs = datetimes.tail().diff().min().total_seconds() - if datasets.original_additional_data is not None: - num_of_additional_cols = len(datasets.original_additional_data.columns) - 2 - else: - num_of_additional_cols = 0 - row_count = len(datasets.original_user_data.index) - number_of_series = len(datasets.categories) - if num_of_additional_cols < 15 and row_count < 10000 and number_of_series < 10 and freq_in_secs > 3600: - return SupportedModels.AutoMLX - elif row_count < 10000 and number_of_series > 10: - operator_config.spec.model_kwargs["model_list"] = "fast_parallel" - return SupportedModels.AutoTS - elif row_count < 20000 and number_of_series > 10: - operator_config.spec.model_kwargs["model_list"] = "superfast" - return SupportedModels.AutoTS - elif row_count > 20000: - return SupportedModels.NeuralProphet - else: - return SupportedModels.NeuralProphet + if columns != None and len(columns) > MAX_COLUMNS_AUTOMLX: + return SupportedModels.Arima + return SupportedModels.AutoMLX def get_frequency_of_datetime(data: pd.DataFrame, dataset_info: ForecastOperatorSpec): @@ -525,9 +523,7 @@ def get_frequency_of_datetime(data: pd.DataFrame, dataset_info: ForecastOperator """ date_column = dataset_info.datetime_column.name - datetimes = pd.to_datetime( - data[date_column].drop_duplicates(), format=dataset_info.datetime_column.format - ) + datetimes = pd.to_datetime(data[date_column].drop_duplicates()) freq = pd.DatetimeIndex(datetimes).inferred_freq if dataset_info.model == SupportedModels.AutoMLX: freq_in_secs = datetimes.tail().diff().min().total_seconds()