Skip to content

Commit

Permalink
still need to find out what's the input format expected
Browse files Browse the repository at this point in the history
  • Loading branch information
leostre committed May 7, 2024
1 parent 8557ab3 commit c5524c0
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 22 deletions.
144 changes: 129 additions & 15 deletions fedot_ind/core/models/nn/network_impl/deepar.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from core.models.nn.network_impl.base_nn_model import BaseNeuralModel
from fedot_ind.core.models.nn.network_impl.base_nn_model import BaseNeuralModel
from typing import Optional, Callable, Any, List, Union
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.data.data import InputData
Expand All @@ -8,6 +8,20 @@
import torch
from fedot_ind.core.models.nn.network_modules.layers.special import RevIN
from fedot_ind.core.models.nn.network_modules.losses import NormalDistributionLoss
from fedot_ind.core.architecture.settings.computational import backend_methods as np
from fedot_ind.core.architecture.abstraction.decorators import convert_inputdata_to_torch_time_series_dataset
from fedot_ind.core.operation.transformation.window_selector import WindowSizeSelector
import pandas as pd
from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.operations.evaluation.operation_implementations.data_operations.ts_transformations import \
transform_features_and_target_into_lagged
import torch.utils.data as data
from fedot_ind.core.architecture.settings.computational import default_device





class DeepARModule(Module):
_loss_fns = {
Expand All @@ -23,6 +37,7 @@ def __init__(self, cell_type, input_size, hidden_size, rnn_layers, dropout, dist
batch_first = True,
dropout = dropout if rnn_layers > 1 else 0.
)
self.hidden_size = hidden_size
self.scaler = RevIN(
affine=False,
input_dim=input_size,
Expand Down Expand Up @@ -108,6 +123,8 @@ def predict(self, x):
output = self._transform_params(distr_params)
return output



def decode(self, x, n_samples=0):
if not n_samples:
output, _ = self._decode_whole_seq(x, hidden_state)
Expand All @@ -130,7 +147,7 @@ def decode(self, x, n_samples=0):
)
# reshape predictions for n_samples:
# from n_samples * batch_size x time steps to batch_size x time steps x n_samples
output = (output, lambda x:: x.reshape(-1, n_samples, input_vector.size(1)).permute(0, 2, 1))
# output = (output, lambda x:: x.reshape(-1, n_samples, input_vector.size(1)).permute(0, 2, 1))
return output


Expand All @@ -151,7 +168,6 @@ def _decode_one(self, x,
def decode_autoregressive(
self,
hidden_state: Any,
# target_scale: Union[List[torch.Tensor], torch.Tensor],
first_target: Union[List[torch.Tensor], torch.Tensor],
n_decoder_steps: int,
n_samples: int = 1,
Expand Down Expand Up @@ -187,12 +203,8 @@ def decode_autoregressive(
output = torch.stack(output, dim=1)
return output











Expand Down Expand Up @@ -221,34 +233,136 @@ def __init__(self, params: Optional[OperationParameters] = {}):
self.hidden_size = params.get('hidden_size', 10)
self.rnn_layers = params.get('rnn_layers', 2)
self.dropout = params.get('dropout', 0.1)
self.horizon = params.get('forecast_length', None)
self.horizon = params.get('forecast_length', 1)
self.forecast_length = self.horizon
self.expected_distribution = params.get('expected_distribution', 'normal')

###
self.preprocess_to_lagged = False
self.patch_len = params.get('patch_len', None)
self.forecast_mode = params.get('forecast_mode', 'raw')
self.quantiles = params.get('quantiles', None)




def _init_model(self, ts) -> tuple:
self.loss_fn = DeepARModule._loss_fns[self.expected_distribution]()
self.model = DeepARModule(input_size=ts.features.shape[1],
hidden_size=self.hidden_size,
cell_type=self.cell_type,
dropout=self.dropout,
rnn_layer=self.rnn_layers,
distribution_params_n=len(self.loss_fn.distribution_arguments)).to(default_device())
rnn_layers=self.rnn_layers,
distribution=self.expected_distribution).to(default_device())
self.model_for_inference = DeepARModule(input_size=ts.features.shape[1],
hidden_size=self.hidden_size,
cell_type=self.cell_type,
dropout=self.dropout,
rnn_layer=self.rnn_layers,
distribution_params_n=len(self.loss_fn.distribution_arguments))
rnn_layers=self.rnn_layers,
distribution=self.expected_distribution)
self._evaluate_num_of_epochs(ts)
self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)

return self.loss_fn, self.optimizer


# def __preprocess_for_fedot(self, input_data):
# input_data.features = np.squeeze(input_data.features)
# if self.horizon is None:
# self.horizon = input_data.task.task_params.forecast_length
# if len(input_data.features.shape) == 1:
# input_data.features = input_data.features.reshape(1, -1)
# else:
# if input_data.features.shape[1] != 1:
# self.preprocess_to_lagged = True

# if self.preprocess_to_lagged:
# self.seq_len = input_data.features.shape[0] + \
# input_data.features.shape[1]
# else:
# self.seq_len = input_data.features.shape[1]
# self.target = input_data.target
# self.task_type = input_data.task
# return input_data

def _fit_model(self, input_data: InputData, split_data: bool = True):
if self.preprocess_to_lagged:
self.patch_len = input_data.features.shape[1]
train_loader = self.__create_torch_loader(input_data)
else:
if self.patch_len is None:
dominant_window_size = WindowSizeSelector(
method='dff').get_window_size(input_data.features)
self.patch_len = 2 * dominant_window_size
train_loader, *val_loader = self._prepare_data(
input_data.features, self.patch_len, False)
print(val_loader)
print(train_loader)
val_loader = val_loader[0] if len(val_loader) else None

self.test_patch_len = self.patch_len
return train_loader
loss_fn, optimizer = self._init_model(input_data)
model = self._train_loop(train_loader=train_loader, loss_fn=loss_fn, optimizer=optimizer, val_loader=val_loader,)
# self.model_list.append(model)

@convert_inputdata_to_torch_time_series_dataset
def _create_dataset(self,
ts: InputData,
flag: str = 'test',
batch_size: int = 16,
freq: int = 1):
return ts

def _prepare_data(self,
ts,
patch_len,
split_data: bool = True,
validation_blocks: int = None):
train_data = self.split_data(ts)
if split_data:
train_data, val_data = train_test_data_setup(
train_data, validation_blocks=validation_blocks)
_, train_data.features, train_data.target = transform_features_and_target_into_lagged(train_data,
self.horizon,
patch_len)
_, val_data.features, val_data.target = transform_features_and_target_into_lagged(val_data,
self.horizon,
patch_len)
val_loader = self.__create_torch_loader(val_data)
train_loader = self.__create_torch_loader(train_data)
return train_loader, val_loader
else:
_, train_data.features, train_data.target = transform_features_and_target_into_lagged(train_data,
self.horizon,
patch_len)
train_loader = self.__create_torch_loader(train_data)
return train_loader


def split_data(self, input_data):

time_series = pd.DataFrame(input_data)
task = Task(TaskTypesEnum.ts_forecasting,
TsForecastingParams(forecast_length=self.horizon))
if 'datetime' in time_series.columns:
idx = pd.to_datetime(time_series['datetime'].values)
else:
idx = time_series.columns.values

time_series = time_series.values
train_input = InputData(idx=idx,
features=time_series.flatten(),
target=time_series.flatten(),
task=task,
data_type=DataTypesEnum.ts)

return train_input

def __create_torch_loader(self, train_data):

train_dataset = self._create_dataset(train_data)
train_loader = torch.utils.data.DataLoader(
data.TensorDataset(train_dataset.x, train_dataset.y),
batch_size=self.batch_size, shuffle=False)
return train_loader


20 changes: 13 additions & 7 deletions fedot_ind/core/models/nn/network_modules/losses.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,17 @@ def forward(self, input: Tensor, target: Tensor) -> Tensor:
class DistributionLoss(nn.Module):
distribution_class: distributions.Distribution
distribution_arguments: List[str]
quantiles: List[float] = [.05, .25, .5, .75, .95]
need_affine=True

def __init__(
self, quantiles: List[float] = [.05, .25, .5, .75, .95], reduction="mean",
self, reduction="mean",
):
super().__init__()
self.quantiles = quantiles
self.reduction = getattr(torch, reduction)
self.reduction = getattr(torch, reduction) if reduction else lambda x: x


def map_x_to_distribution(self, x: torch.Tensor) -> distributions.Distribution:
@classmethod
def map_x_to_distribution(cls, x: torch.Tensor) -> distributions.Distribution:
"""
Map the a tensor of parameters to a probability distribution.
Expand All @@ -277,11 +277,16 @@ def map_x_to_distribution(self, x: torch.Tensor) -> distributions.Distribution:
distributions.Distribution: torch probability distribution as defined in the
class attribute ``distribution_class``
"""
distr = self._map_x_to_distribution(x)
if self.need_affine:
distr = cls._map_x_to_distribution(x)
if cls.need_affine:
scaler = distributions.AffineTransform(loc=x[..., 0], scale=x[..., 1])
distr = distributions.TransformedDistribution(distr, [scaler])
return distr

@classmethod
def _map_x_to_distribution(cls, x):
raise NotImplemented


def forward(self, y_pred: torch.Tensor, y_actual: torch.Tensor) -> torch.Tensor:
"""
Expand Down Expand Up @@ -309,6 +314,7 @@ class NormalDistributionLoss(DistributionLoss):
distribution_arguments = ["loc", "scale"]
need_affine=False

@classmethod
def _map_x_to_distribution(self, x: torch.Tensor) -> distributions.Normal:
loc = x[..., -2]
scale = x[..., -1]
Expand Down

0 comments on commit c5524c0

Please sign in to comment.