Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Major] Speedup dataset get_item #1636

Merged
merged 24 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@
"try:\n",
" # it already installed dependencies\n",
" from torchviz import make_dot\n",
"except:\n",
"except ImportError:\n",
" # install graphviz on system\n",
" import platform\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@
"try:\n",
" # it already installed dependencies\n",
" from torchviz import make_dot\n",
"except:\n",
"except ImportError:\n",
" # install graphviz on system\n",
" import platform\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
" # it already installed dependencies\n",
" from torchsummary import summary\n",
" from torchviz import make_dot\n",
"except:\n",
"except ImportError:\n",
" # install graphviz on system\n",
" import platform\n",
"\n",
Expand Down Expand Up @@ -69,7 +69,7 @@
"source": [
"try:\n",
" from neuralprophet import NeuralProphet\n",
"except:\n",
"except ImportError:\n",
" # if NeuralProphet is not installed yet:\n",
" !pip install git+https://github.com/ourownstory/neural_prophet.git\n",
" from neuralprophet import NeuralProphet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,12 @@
"# Set loggers to ERROR level\n",
"import logging\n",
"import warnings\n",
"from neuralprophet import set_log_level\n",
"\n",
"\n",
"logging.getLogger(\"prophet\").setLevel(logging.ERROR)\n",
"warnings.filterwarnings(\"ignore\")\n",
"\n",
"from neuralprophet import set_log_level\n",
"\n",
"set_log_level(\"ERROR\")"
]
Expand Down
5 changes: 5 additions & 0 deletions neuralprophet/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

@dataclass
class Model:
features_map: dict
lagged_reg_layers: Optional[List[int]]
quantiles: Optional[List[float]] = None

def setup_quantiles(self):
Expand All @@ -42,6 +44,9 @@
self.quantiles.insert(0, 0.5)


ConfigModel = Model


@dataclass
class Normalization:
normalize: str
Expand Down Expand Up @@ -345,7 +350,7 @@
log.error("Invalid growth for global_local mode '{}'. Set to 'global'".format(self.trend_global_local))
self.trend_global_local = "global"

if self.trend_local_reg < 0:

Check failure on line 353 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Operator "<" not supported for "None" (reportOptionalOperand)
log.error("Invalid negative trend_local_reg '{}'. Set to False".format(self.trend_local_reg))
self.trend_local_reg = False

Expand Down Expand Up @@ -394,13 +399,13 @@
log.error("Invalid global_local mode '{}'. Set to 'global'".format(self.global_local))
self.global_local = "global"

self.periods = OrderedDict(

Check failure on line 402 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

No overloads for "__init__" match the provided arguments (reportCallIssue)
{

Check failure on line 403 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Argument of type "dict[str, Season]" cannot be assigned to parameter "iterable" of type "Iterable[list[bytes]]" in function "__init__" (reportArgumentType)
"yearly": Season(
resolution=6,
period=365.25,
arg=self.yearly_arg,
global_local=(

Check failure on line 408 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Argument of type "SeasonGlobalLocalMode | Literal['auto']" cannot be assigned to parameter "global_local" of type "SeasonGlobalLocalMode" in function "__init__" (reportArgumentType)
self.yearly_global_local
if self.yearly_global_local in ["global", "local"]
else self.global_local
Expand All @@ -411,7 +416,7 @@
resolution=3,
period=7,
arg=self.weekly_arg,
global_local=(

Check failure on line 419 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Argument of type "SeasonGlobalLocalMode | Literal['auto']" cannot be assigned to parameter "global_local" of type "SeasonGlobalLocalMode" in function "__init__" (reportArgumentType)
self.weekly_global_local
if self.weekly_global_local in ["global", "local"]
else self.global_local
Expand All @@ -422,7 +427,7 @@
resolution=6,
period=1,
arg=self.daily_arg,
global_local=(

Check failure on line 430 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Argument of type "SeasonGlobalLocalMode | Literal['auto']" cannot be assigned to parameter "global_local" of type "SeasonGlobalLocalMode" in function "__init__" (reportArgumentType)
self.daily_global_local if self.daily_global_local in ["global", "local"] else self.global_local
),
condition_name=None,
Expand All @@ -430,7 +435,7 @@
}
)

assert self.seasonality_local_reg >= 0, "Invalid seasonality_local_reg '{}'.".format(self.seasonality_local_reg)

Check failure on line 438 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Operator ">=" not supported for "None" (reportOptionalOperand)

if self.seasonality_local_reg is True:
log.warning("seasonality_local_reg = True. Default seasonality_local_reg value set to 1")
Expand All @@ -448,7 +453,7 @@
resolution=resolution,
period=period,
arg=arg,
global_local=global_local if global_local in ["global", "local"] else self.global_local,

Check failure on line 456 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Argument of type "str" cannot be assigned to parameter "global_local" of type "SeasonGlobalLocalMode" in function "__init__"   Type "str" is not assignable to type "SeasonGlobalLocalMode"     "str" is not assignable to type "Literal['global']"     "str" is not assignable to type "Literal['local']"     "str" is not assignable to type "Literal['glocal']" (reportArgumentType)
condition_name=condition_name,
)

Expand Down Expand Up @@ -509,7 +514,7 @@
class ConfigLaggedRegressors:
layers: Optional[List[int]] = field(default_factory=list)
# List of hidden layers for shared NN across LaggedReg. The default value is ``[]``, which initializes no hidden layers.
regressors: OrderedDict[LaggedRegressor] = field(init=False)

Check failure on line 517 in neuralprophet/configure.py

View workflow job for this annotation

GitHub Actions / pyright

Too few type arguments provided for "OrderedDict"; expected 2 but received 1 (reportInvalidTypeArguments)

def __post_init__(self):
self.regressors = None
Expand Down
4 changes: 3 additions & 1 deletion neuralprophet/data/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def _handle_missing_data(
return df


def _create_dataset(model, df, predict_mode, prediction_frequency=None):
def _create_dataset(model, df, predict_mode, prediction_frequency=None, components_stacker=None):
"""Construct dataset from dataframe.

(Configured Hyperparameters can be overridden by explicitly supplying them.
Expand Down Expand Up @@ -626,5 +626,7 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None):
config_regressors=model.config_regressors,
config_lagged_regressors=model.config_lagged_regressors,
config_missing=model.config_missing,
config_model=model.config_model,
components_stacker=components_stacker,
# config_train=model.config_train, # no longer needed since JIT tabularization.
)
102 changes: 89 additions & 13 deletions neuralprophet/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@
from matplotlib.axes import Axes
from torch.utils.data import DataLoader

from neuralprophet import configure, df_utils, np_types, time_dataset, time_net, utils, utils_lightning, utils_metrics
from neuralprophet import (
configure,
df_utils,
np_types,
time_dataset,
time_net,
utils,
utils_lightning,
utils_metrics,
utils_time_dataset,
)
from neuralprophet.data.process import (
_check_dataframe,
_convert_raw_predictions_to_raw_df,
Expand All @@ -34,6 +44,7 @@
from neuralprophet.plot_model_parameters_plotly import plot_parameters as plot_parameters_plotly
from neuralprophet.plot_utils import get_valid_configuration, log_warning_deprecation_plotly, select_plotting_backend
from neuralprophet.uncertainty import Conformal
from neuralprophet.utils_time_dataset import ComponentStacker

log = logging.getLogger("NP.forecaster")

Expand Down Expand Up @@ -503,6 +514,8 @@ def __init__(

# Model
self.config_model = configure.Model(
features_map={},
lagged_reg_layers=lagged_reg_layers,
quantiles=quantiles,
)
self.config_model.setup_quantiles()
Expand Down Expand Up @@ -1067,7 +1080,7 @@ def fit(
or any(value != 1 for value in self.num_seasonalities_modelled_dict.values())
)

##### Data Setup, and Training Setup #####
# Data Setup, and Training Setup
# Train Configuration: overwrite self.config_train with user provided values
if learning_rate is not None:
self.config_train.learning_rate = learning_rate
Expand Down Expand Up @@ -1152,7 +1165,22 @@ def fit(
# Set up DataLoaders: Train
# Create TimeDataset
# Note: _create_dataset() needs to be called after set_auto_seasonalities()
dataset = _create_dataset(self, df, predict_mode=False, prediction_frequency=self.prediction_frequency)
train_components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
n_forecasts=self.n_forecasts,
max_lags=self.max_lags,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)

dataset = _create_dataset(
self,
df,
predict_mode=False,
prediction_frequency=self.prediction_frequency,
components_stacker=train_components_stacker,
)
# Determine the max_number of epochs
self.config_train.set_auto_batch_epoch(n_data=len(dataset))
# Create Train DataLoader
Expand All @@ -1162,6 +1190,7 @@ def fit(
shuffle=True,
num_workers=num_workers,
)

self.config_train.set_batches_per_epoch(len(loader))
log.info(f"Train Dataset size: {len(dataset)}")
log.info(f"Number of batches per training epoch: {len(loader)}")
Expand All @@ -1186,7 +1215,15 @@ def fit(
)
# df_val, _, _, _ = df_utils.prep_or_copy_df(df_val)
df_val = _normalize(df=df_val, config_normalization=self.config_normalization)
dataset_val = _create_dataset(self, df_val, predict_mode=False)
val_components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
max_lags=self.max_lags,
n_forecasts=self.n_forecasts,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)
dataset_val = _create_dataset(self, df_val, predict_mode=False, components_stacker=val_components_stacker)
loader_val = DataLoader(dataset_val, batch_size=min(1024, len(dataset_val)), shuffle=False, drop_last=False)

# Init the Trainer
Expand All @@ -1206,12 +1243,16 @@ def fit(
if not self.fitted:
self.model = self._init_model()

self.model.set_components_stacker(components_stacker=train_components_stacker, mode="train")
if validation_enabled:
self.model.set_components_stacker(components_stacker=val_components_stacker, mode="val")

# Find suitable learning rate if not set
if self.config_train.learning_rate is None:
assert not self.fitted, "Learning rate must be provided for re-training a fitted model."

## Init a separate Model, Loader and Trainer copy for LR finder (optional, done for safety)
## Note Leads to a CUDA issue. Needs to be fixed before enabling this feature.
# Init a separate Model, Loader and Trainer copy for LR finder (optional, done for safety)
# Note Leads to a CUDA issue. Needs to be fixed before enabling this feature.
# model_lr_finder = self._init_model()
# loader_lr_finder = DataLoader(
# dataset,
Expand Down Expand Up @@ -1414,7 +1455,16 @@ def test(self, df: pd.DataFrame, verbose: bool = True):
)
df, _, _, _ = df_utils.prep_or_copy_df(df)
df = _normalize(df=df, config_normalization=self.config_normalization)
dataset = _create_dataset(self, df, predict_mode=False)
components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
n_forecasts=self.n_forecasts,
max_lags=self.max_lags,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)
dataset = _create_dataset(self, df, predict_mode=False, components_stacker=components_stacker)
self.model.set_components_stacker(components_stacker, mode="test")
test_loader = DataLoader(dataset, batch_size=min(1024, len(dataset)), shuffle=False, drop_last=False)
# Use Lightning to calculate metrics
val_metrics = self.trainer.test(self.model, dataloaders=test_loader, verbose=verbose)
Expand Down Expand Up @@ -2047,6 +2097,13 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5):
df = _normalize(df=df, config_normalization=self.config_normalization)
df_seasonal = pd.DataFrame()
for df_name, df_i in df.groupby("ID"):
feature_unstackor = ComponentStacker(
n_lags=0,
max_lags=0,
n_forecasts=1,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should set elf.config_lagged_regressors to none

)
dataset = time_dataset.TimeDataset(
df=df_i,
predict_mode=True,
Expand All @@ -2060,25 +2117,29 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5):
config_regressors=self.config_regressors,
config_lagged_regressors=self.config_lagged_regressors,
config_missing=self.config_missing,
config_model=self.config_model,
components_stacker=feature_unstackor,
# config_train=self.config_train, # no longer needed since JIT tabularization.
)
self.model.set_components_stacker(feature_unstackor, mode="predict")
loader = DataLoader(dataset, batch_size=min(4096, len(df)), shuffle=False, drop_last=False)
predicted = {}
for name in self.config_seasonality.periods:
predicted[name] = list()
for inputs, _, meta in loader:
for inputs_tensor, meta in loader:
# Meta as a tensor for prediction
if self.model.config_seasonality is None:
meta_name_tensor = None
elif self.model.config_seasonality.global_local in ["local", "glocal"]:
meta = OrderedDict()
meta["df_name"] = [df_name for _ in range(inputs["time"].shape[0])]
time_input = feature_unstackor.unstack_component("time", inputs_tensor)
meta["df_name"] = [df_name for _ in range(time_input.shape[0])]
meta_name_tensor = torch.tensor([self.model.id_dict[i] for i in meta["df_name"]]) # type: ignore
else:
meta_name_tensor = None

seasonalities_input = feature_unstackor.unstack_component("seasonalities", inputs_tensor)
for name in self.config_seasonality.periods:
features = inputs["seasonalities"][name]
features = seasonalities_input[name]
quantile_index = self.config_model.quantiles.index(quantile)
y_season = torch.squeeze(
self.model.seasonality.compute_fourier(features=features, name=name, meta=meta_name_tensor)[
Expand Down Expand Up @@ -2880,7 +2941,22 @@ def _predict_raw(self, df, df_name, include_components=False, prediction_frequen
assert len(df["ID"].unique()) == 1
if "y_scaled" not in df.columns or "t" not in df.columns:
raise ValueError("Received unprepared dataframe to predict. " "Please call predict_dataframe_to_predict.")
dataset = _create_dataset(self, df, predict_mode=True, prediction_frequency=prediction_frequency)
components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
n_forecasts=self.n_forecasts,
max_lags=self.max_lags,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)
dataset = _create_dataset(
self,
df,
predict_mode=True,
prediction_frequency=prediction_frequency,
components_stacker=components_stacker,
)
self.model.set_components_stacker(components_stacker, mode="predict")
loader = DataLoader(dataset, batch_size=min(1024, len(df)), shuffle=False, drop_last=False)
if self.n_forecasts > 1:
dates = df["ds"].iloc[self.max_lags : -self.n_forecasts + 1]
Expand All @@ -2893,7 +2969,7 @@ def _predict_raw(self, df, df_name, include_components=False, prediction_frequen
self.model.set_covar_weights(self.model.get_covar_weights())
# Compute the predictions and components (if requested)
result = self.trainer.predict(self.model, loader)
# Extract the prediction and components
# unstack the prediction and components
predicted, component_vectors = zip(*result)
predicted = np.concatenate(predicted)

Expand Down
Loading
Loading