From ba72e5a9ae8eeba6c3390c8292942b618b42b5ae Mon Sep 17 00:00:00 2001 From: Jorgedavyd Date: Fri, 31 May 2024 02:37:54 +0000 Subject: [PATCH] Automatically formatted with black --- lightorch/nn/__init__.py | 2 +- lightorch/nn/fourier.py | 58 +++++++++++++----- lightorch/nn/functional.py | 13 ++-- lightorch/nn/utils.py | 122 +++++++++++++++++++------------------ tests/test_nn.py | 56 +++++++++-------- tests/utils.py | 2 + 6 files changed, 148 insertions(+), 105 deletions(-) diff --git a/lightorch/nn/__init__.py b/lightorch/nn/__init__.py index fcb17b1..43c00dc 100644 --- a/lightorch/nn/__init__.py +++ b/lightorch/nn/__init__.py @@ -7,4 +7,4 @@ from .dnn import * from .kan import * from .monte_carlo import * -from .utils import * \ No newline at end of file +from .utils import * diff --git a/lightorch/nn/fourier.py b/lightorch/nn/fourier.py index f543b64..7691931 100644 --- a/lightorch/nn/fourier.py +++ b/lightorch/nn/fourier.py @@ -8,6 +8,7 @@ from typing import Tuple, Sequence from itertools import chain + class _FourierConvNd(nn.Module): def __init__( self, @@ -25,9 +26,9 @@ def __init__( ) -> None: self.n = n if isinstance(kernel_size, tuple): - assert (n == n), f'Not valid kernel size for {n}-convolution' + assert n == n, f"Not valid kernel size for {n}-convolution" else: - kernel_size = (kernel_size, )*n + kernel_size = (kernel_size,) * n super().__init__() self.factory_kwargs = {"device": device, "dtype": dtype} @@ -57,14 +58,16 @@ def __init__( self.bias = None self._init_parameters() - + def get_padding(self, padding: Tuple[int, ...] | int) -> Sequence[int]: if isinstance(padding, tuple): - assert(len(padding) == self.n), f'Not valid padding scheme for {self.n}-convolution' - return tuple(*chain.from_iterable([(i, )*2 for i in reversed(padding)])) + assert ( + len(padding) == self.n + ), f"Not valid padding scheme for {self.n}-convolution" + return tuple(*chain.from_iterable([(i,) * 2 for i in reversed(padding)])) else: - return tuple(*chain.from_iterable([(padding, )*2 for _ in range(self.n)])) - + return tuple(*chain.from_iterable([(padding,) * 2 for _ in range(self.n)])) + def _fourier_space(self) -> Tensor: # probably deprecated if self.bias is not None: @@ -81,8 +84,33 @@ def _init_parameters(self) -> None: class _FourierDeconvNd(_FourierConvNd): - def __init__(self, n: int, in_channels: int, out_channels: int, kernel_size: Tuple[int] | int, padding: Tuple[int], bias: bool = True, eps: float = 0.00001, pre_fft: bool = True, post_ifft: bool = False, device=None, dtype=None) -> None: - super().__init__(n, in_channels, out_channels, kernel_size, padding, bias, eps, pre_fft, post_ifft, device, dtype) + def __init__( + self, + n: int, + in_channels: int, + out_channels: int, + kernel_size: Tuple[int] | int, + padding: Tuple[int], + bias: bool = True, + eps: float = 0.00001, + pre_fft: bool = True, + post_ifft: bool = False, + device=None, + dtype=None, + ) -> None: + super().__init__( + n, + in_channels, + out_channels, + kernel_size, + padding, + bias, + eps, + pre_fft, + post_ifft, + device, + dtype, + ) class FourierConv1d(_FourierConvNd): @@ -99,7 +127,7 @@ def forward(self, input: Tensor) -> Tensor: f.pad(input, self.padding, mode="constant", value=0), self.one, self.weight, - self.bias + self.bias, ) if self.ifft: return self.ifft(out) @@ -120,7 +148,7 @@ def forward(self, input: Tensor) -> Tensor: f.pad(input, self.padding, "constant", value=0), self.one, self.weight, - self.bias + self.bias, ) if self.ifft: @@ -142,7 +170,7 @@ def forward(self, input: Tensor) -> Tensor: f.pad(input, self.padding, "constant", value=0), self.one, self.weight, - self.bias + self.bias, ) if self.ifft: return self.ifft(out) @@ -163,7 +191,7 @@ def forward(self, input: Tensor) -> Tensor: f.pad(input, self.padding, "constant", value=0), self.one, self.weight, - self.bias + self.bias, ) if self.ifft: return self.ifft(out) @@ -184,7 +212,7 @@ def forward(self, input: Tensor) -> Tensor: f.pad(input, self.padding, "constant", value=0), self.one, self.weight, - self.bias + self.bias, ) if self.ifft: return self.ifft(out) @@ -205,7 +233,7 @@ def forward(self, input: Tensor) -> Tensor: f.pad(input, self.padding, "constant", value=0), self.one, self.weight, - self.bias + self.bias, ) if self.ifft: return self.ifft(out) diff --git a/lightorch/nn/functional.py b/lightorch/nn/functional.py index ff19c0a..ca79989 100644 --- a/lightorch/nn/functional.py +++ b/lightorch/nn/functional.py @@ -11,28 +11,29 @@ def _fourierconvNd( n: int, x: Tensor, weight: Tensor, bias: Tensor | None, eps: float = 1e-5 ) -> Tensor: # To fourier space - weight = fftn(weight, dim = (-i for i in range(1, n+1))) - + weight = fftn(weight, dim=(-i for i in range(1, n + 1))) + # weight -> 1, 1, out channels, *kernel_size x *= weight.reshape(1, 1, *weight.shape) + eps # Convolution in the fourier space if bias is not None: - bias = fftn(bias, dim = (-i for i in range(1, n+1))) + bias = fftn(bias, dim=(-i for i in range(1, n + 1))) return x + bias.reshape(1, 1, -1, *[1 for _ in range(n)]) return x + def _fourierdeconvNd( n: int, x: Tensor, weight: Tensor, bias: Tensor | None, eps: float = 1e-5 ) -> Tensor: # To fourier space - weight = fftn(weight, dim = (-i for i in range(1, n+1))) - + weight = fftn(weight, dim=(-i for i in range(1, n + 1))) + # weight -> 1, 1, out channels, *kernel_size x /= weight.reshape(1, 1, *weight.shape) + eps # Convolution in the fourier space if bias is not None: - bias = fftn(bias, dim = (-i for i in range(1, n+1))) + bias = fftn(bias, dim=(-i for i in range(1, n + 1))) return x + bias.reshape(1, 1, -1, *[1 for _ in range(n)]) return x diff --git a/lightorch/nn/utils.py b/lightorch/nn/utils.py index 3d737d0..239ed35 100644 --- a/lightorch/nn/utils.py +++ b/lightorch/nn/utils.py @@ -1,86 +1,92 @@ from torch import nn, Tensor from typing import List, Sequence from torchvision.models import ( - VGG19_Weights, vgg19, - VGG16_Weights, vgg16, - ResNet50_Weights, resnet50, - resnet101, ResNet101_Weights + VGG19_Weights, + vgg19, + VGG16_Weights, + vgg16, + ResNet50_Weights, + resnet50, + resnet101, + ResNet101_Weights, ) VALID_MODELS = { - 'vgg19': { - 'model': vgg19, - 'weights': VGG19_Weights, - 'valid_layers': [ - i for i in range(37) - ] + "vgg19": { + "model": vgg19, + "weights": VGG19_Weights, + "valid_layers": [i for i in range(37)], }, - 'vgg16': { - 'model': vgg16, - 'weights': VGG16_Weights, - 'valid_layers': [ - i for i in range(31) - ] + "vgg16": { + "model": vgg16, + "weights": VGG16_Weights, + "valid_layers": [i for i in range(31)], + }, + "resnet50": { + "model": resnet50, + "weights": ResNet50_Weights, + "valid_layers": [ + "conv1", + "bn1", + "relu", + "maxpool", + "layer1", + "layer2", + "layer3", + "layer4", + "avgpool", + "fc", + ], + }, + "resnet101": { + "model": resnet101, + "weights": ResNet101_Weights, + "valid_layers": [ + "conv1", + "bn1", + "relu", + "maxpool", + "layer1", + "layer2", + "layer3", + "layer4", + "avgpool", + "fc", + ], }, - 'resnet50': { - 'model': resnet50, - 'weights': ResNet50_Weights, - 'valid_layers': [ - 'conv1', - 'bn1', - 'relu', - 'maxpool', - 'layer1', - 'layer2', - 'layer3', - 'layer4', - 'avgpool', - 'fc', - ] - }, - 'resnet101': { - 'model': resnet101, - 'weights': ResNet101_Weights, - 'valid_layers': [ - 'conv1', - 'bn1', - 'relu', - 'maxpool', - 'layer1', - 'layer2', - 'layer3', - 'layer4', - 'avgpool', - 'fc', - ] - }, } class FeatureExtractor(nn.Module): - def __init__(self, layers: Sequence[int] = [4,9,18], model_str: str = 'vgg19') -> None: - assert(model_str in VALID_MODELS), f'Model not in {VALID_MODELS.keys()}' - assert (list(set(layers)) == layers), 'Not valid repeated inputs' + def __init__( + self, layers: Sequence[int] = [4, 9, 18], model_str: str = "vgg19" + ) -> None: + assert model_str in VALID_MODELS, f"Model not in {VALID_MODELS.keys()}" + assert list(set(layers)) == layers, "Not valid repeated inputs" hist: List = [] for layer in layers: - valid_models: List[str] = VALID_MODELS[model_str]['valid_layers'] + valid_models: List[str] = VALID_MODELS[model_str]["valid_layers"] num = valid_models.index(layer) hist.append(num) - assert (sorted(hist) == hist), 'Not ordered inputs' + assert sorted(hist) == hist, "Not ordered inputs" super().__init__() self.model_str: str = model_str self.layers = list(map(str, layers)) - self.model = VALID_MODELS[model_str](weights = VALID_MODELS[model_str][1].IMAGENET1K_V1) + self.model = VALID_MODELS[model_str]( + weights=VALID_MODELS[model_str][1].IMAGENET1K_V1 + ) for param in self.model.parameters(): param.requires_grad = False # Setting the transformation - self.transform = VALID_MODELS[model_str][1].IMAGENET1K_V1.transforms(antialias=True) - + self.transform = VALID_MODELS[model_str][1].IMAGENET1K_V1.transforms( + antialias=True + ) + def forward(self, input: Tensor) -> List[Tensor]: features = [] - - if 'vgg' in self.model_str: + + if "vgg" in self.model_str: for name, layer in self.model.features.named_children(): input = layer(input) if name in self.layers: diff --git a/tests/test_nn.py b/tests/test_nn.py index 6065fba..7daf3cb 100644 --- a/tests/test_nn.py +++ b/tests/test_nn.py @@ -13,11 +13,12 @@ model: nn.Module = Model(32) randint: int = random.randint(-100, 100) + def test_complex() -> None: sample_input = torch.randn(32, 10) + 1j * torch.randn(32, 10) layer = Complex(nn.Linear(10, 20)) result = layer(sample_input) - assert result is not None, 'Complex failed' + assert result is not None, "Complex failed" def test_tv() -> None: @@ -25,32 +26,35 @@ def test_tv() -> None: result = loss(input=input) assert result is not None, "TV loss failed" -#Integrated + +# Integrated def test_style() -> None: - feature: nn.Module = FeatureExtractor([8, 12], 'vgg16') + feature: nn.Module = FeatureExtractor([8, 12], "vgg16") loss = StyleLoss(feature, input, randint) result = loss(input=input, target=target, feature_extractor=False) assert result is not None, "StyleLoss failed" -#Integrated +# Integrated def test_perc() -> None: - feature: nn.Module = FeatureExtractor([8, 12], 'vgg16') + feature: nn.Module = FeatureExtractor([8, 12], "vgg16") loss = PerceptualLoss(feature, input, randint) result = loss(input=input, target=target, feature_extractor=False) assert result is not None, "PerceptualLoss failed" def test_psnr() -> None: - loss = MSELoss(factor = randint) + loss = MSELoss(factor=randint) result = loss(input=input, target=target) assert result is not None, "MSE failed" + def test_psnr() -> None: - loss = CrossEntropyLoss(factor = randint) + loss = CrossEntropyLoss(factor=randint) result = loss(input=input, target=target) assert result is not None, "CrossEntropy failed" + def test_psnr() -> None: loss = PeakNoiseSignalRatio(1, randint) result = loss(input=input, target=target) @@ -60,10 +64,7 @@ def test_psnr() -> None: def test_lagrange() -> None: lambd = torch.tensor([randint for _ in range(2)], dtype=torch.float32) loss = LagrangianFunctional( - MSELoss(factor=1), - MSELoss(factor=1), - MSELoss(factor=1), - lambd=lambd + MSELoss(factor=1), MSELoss(factor=1), MSELoss(factor=1), lambd=lambd ) result = loss(input=input, target=target) assert result is not None, "LagrangianFunctional failed" @@ -87,37 +88,41 @@ def test_fourier2d() -> None: sample_input: Tensor = torch.randn(32, 3, 256, 256) # batch size, input_size model = nn.Sequential( FourierConv2d(3, 10, 5, 1, pre_fft=True), - FourierConv2d(10, 20, 5, 1, post_ifft=True) + FourierConv2d(10, 20, 5, 1, post_ifft=True), ) output = model(sample_input) assert output.shape == (32, 20, 256, 256), "FourierConv2d failed" model = nn.Sequential( FourierDeconv2d(3, 10, 5, 1, pre_fft=True), - FourierDeconv2d(10, 20, 5, 1, post_ifft=True) + FourierDeconv2d(10, 20, 5, 1, post_ifft=True), ) output = model(sample_input) assert output.shape == (32, 20, 256, 256), "FourierDeconv2d failed" + def test_fourier1d() -> None: sample_input: Tensor = torch.randn(32, 1, 10) # batch size, channels, input_size model = nn.Sequential( FourierConv1d(1, 3, 5, 1, pre_fft=True), - FourierConv1d(3, 5, 5, 1, post_ifft=True) + FourierConv1d(3, 5, 5, 1, post_ifft=True), ) output = model(sample_input) assert output.shape == (32, 5, 10), "FourierConv1d failed" model = nn.Sequential( FourierDeconv1d(1, 3, 5, 1, pre_fft=True), - FourierDeconv1d(3, 5, 5, 1, post_ifft=True) + FourierDeconv1d(3, 5, 5, 1, post_ifft=True), ) output = model(sample_input) assert output.shape == (32, 74, 10), "FourierDeconv1d failed" + def test_fourier3d() -> None: - sample_input: Tensor = torch.randn(32, 3, 5, 256, 256) # batch size, channels, frames, height, width + sample_input: Tensor = torch.randn( + 32, 3, 5, 256, 256 + ) # batch size, channels, frames, height, width model = nn.Sequential( FourierConv3d(3, 2, 8, pre_fft=True), FourierConv3d(2, 1, 8, post_ifft=True), @@ -134,22 +139,24 @@ def test_fourier3d() -> None: def test_partial() -> None: - sample_input: Tensor = torch.randn(32, 3, 256, 256) # batch size, channels, height, width + sample_input: Tensor = torch.randn( + 32, 3, 256, 256 + ) # batch size, channels, height, width mask = create_mask() - model = nn.Sequential( - PartialConv2d(3, 5, 3, 1, 1), - PartialConv2d(5, 5, 3, 1, 1) - ) + model = nn.Sequential(PartialConv2d(3, 5, 3, 1, 1), PartialConv2d(5, 5, 3, 1, 1)) output = model(sample_input, mask) assert output.shape == (32, 5, 256, 256), "PartialConv2d failed" def test_normalization() -> None: - sample_input: Tensor = torch.randn(32, 20, 10) # batch size, sequence_length, input_size + sample_input: Tensor = torch.randn( + 32, 20, 10 + ) # batch size, sequence_length, input_size norm = RootMeanSquaredNormalization(dim=10) output = norm(sample_input) assert output.shape == (32, 20, 10), "RootMeanSquaredNormalization failed" + # Integrated def test_monte_carlo() -> None: sample_input: Tensor = torch.randn(32, 10) # batch size, input_size @@ -157,16 +164,15 @@ def test_monte_carlo() -> None: fc_layer=DeepNeuralNetwork( in_features=10, layers=(20, 20, 1), - activations=(nn.ReLU(), nn.ReLU(), nn.Sigmoid()) + activations=(nn.ReLU(), nn.ReLU(), nn.Sigmoid()), ), dropout=0.5, - n_sampling=50 + n_sampling=50, ) output = model(sample_input) assert output.shape == (32, 1), "MonteCarloFC failed" - def test_kan() -> None: # Placeholder for future implementation raise NotImplementedError("KAN test not implemented") diff --git a/tests/utils.py b/tests/utils.py index 5f15a0e..c0b2cf8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -5,6 +5,7 @@ from torch.utils.data import Dataset, DataLoader import random + def create_inputs(*size) -> Tensor: return torch.randn(*size) @@ -52,6 +53,7 @@ def val_dataloader(self) -> DataLoader: pin_memory=self.pin_memory, ) + def create_mask(): # Create a rectangle n = random.randint(1, 31) # Random integer for n