diff --git a/.circleci/config.yml b/.circleci/config.yml index 92e1102..35cd01e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -107,6 +107,32 @@ jobs: export NEURAL_STYLE_TRANSFER_RELEASE_VERSION=$(python scripts/parse_package_version.py) tox -e deploy -vv + run-algorithm: + executor: ubuntu-1604-vm + environment: + NST_OUTPUT: /nst-output + IMAGE: boromir674/neural-style-transfer + IMAGE_1: /app/tests/data/canoe_water_w300-h225.jpg + IMAGE_2: /app/tests/data/blue-red_w300-h225.jpg + ITER: 100 + steps: + - run: + name: Create a directory to store the generated images + command: sudo mkdir $NST_OUTPUT + - run: + name: Pull an image (using docker) where both code and a pretrained model are present + command: docker pull $IMAGE + - run: + name: Run a Neural Style Transfer Algorithm for 22 iterations (using the pretrained VGG Deep Neural Network) + command: docker run -it --rm -v $NST_OUTPUT:$NST_OUTPUT $IMAGE $IMAGE_1 $IMAGE_2 --iterations $ITER --location $NST_OUTPUT + - store_artifacts: + path: /nst-output + destination: nst-output + - persist_to_workspace: + root: / + paths: + - nst-output + integration-test: executor: py38-docker-image steps: @@ -148,10 +174,12 @@ workflows: version: 2 build_accept: jobs: + - build_n_test: filters: tags: only: /.*/ # runs for all branches and all tags + - send-coverage-to-codacy: requires: - build_n_test @@ -164,6 +192,22 @@ workflows: filters: tags: only: /.*/ + - run-algorithm: + requires: + - build_n_test + filters: + branches: + only: + - regression-test + - inspect-previous-artifacts: + type: approval + requires: + - run-algorithm + filters: + branches: + only: + - regression-test + - visualize_dependency_graphs: filters: branches: @@ -171,6 +215,7 @@ workflows: - master - dev - release-staging + # - build-documentation: # filters: # branches: diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8fa296f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,48 @@ +*/__pycache__ +htmlcov +__pycache__ +*.pyc +*.pyo +*.pyd +.Python +env +pip-log.txt +pip-delete-this-directory.txt +.tox +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.log +.git +.mypy_cache +.pytest_cache +.hypothesis + +node_modules +pretrained* +style-images +local/ +output/ +nst_output/ +env/ +env-dev/ +dist/ +build/ +test-results/ +dependency-graphs/ +uml-diagrams +.circleci +.vscode +.bettercodehub.yml +.gitignore +.prospector.yml +.pylintrc + +mypy.ini +package.json +package-lock.json +tox.ini +Dockerfile diff --git a/.gitignore b/.gitignore index 829b4af..3fe3486 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ build/ *output uml-diagrams dependency-graphs +coverage.xml diff --git a/.prospector.yml b/.prospector.yml index c248258..bf4deef 100644 --- a/.prospector.yml +++ b/.prospector.yml @@ -72,8 +72,9 @@ pep257: mypy: run: false options: - ignore-missing-imports: true - follow-imports: skip + ignore-missing-imports: false + follow-imports: normal + vulture: run: false diff --git a/.pylintrc b/.pylintrc index f4b39eb..f3a0fb1 100644 --- a/.pylintrc +++ b/.pylintrc @@ -225,8 +225,9 @@ contextmanager-decorators=contextlib.contextmanager # List of members which are set dynamically and missed by pylint inference # system, and so shouldn't trigger E1101 when accessed. Python regular -# expressions are accepted. -generated-members=_transform|self\.objects|self\._observers +# expressions are accepted. Things coming from a metaclass or a dynamic attribute set. +# Can solve E1101: Class 'X' has no 'y' member (no-member) +generated-members=self\.objects|self\._observers|compute_gram|_content_image|_style_image|adapter_type|mapping # Tells whether missing members accessed in mixin class should be ignored. A # mixin class is detected if its name ends with "mixin" (case insensitive). @@ -349,13 +350,22 @@ function-naming-style=snake_case #function-rgx= # Good variable names which should always be accepted, separated by a comma. +# Can solve C0103: Variable name "x" doesn't conform to snake_case naming style (invalid-name) good-names=i, j, k, ex, Run, _, - T + T, + a_C, + a_G, + a_S, + n_H, + n_W, + n_C,GS,GG,J_style,J_content,J_style_layer,W,b, + Js,Jt,Jc, + im # Good variable names regexes, separated by a comma. If names match any regex, # they will always be accepted diff --git a/CHANGELOG.rst b/CHANGELOG.rst new file mode 100644 index 0000000..b9ab9f3 --- /dev/null +++ b/CHANGELOG.rst @@ -0,0 +1,19 @@ +0.6.1 (2021-12-01) +------------------ + +Changes +^^^^^^^ + +test +"""" +- test algorithm & conditionally mock production pretrained model + + +documentation +""""""""""""" +- document how to use the docker image hosted on docker hub + + +ci +"" +- run regression test on ci server diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8b0528f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.8-slim + +WORKDIR /app + +ADD imagenet-vgg-verydeep-19.mat.tar . + + +COPY requirements/dev.txt reqs.txt +RUN pip install -r reqs.txt + +COPY . . + +RUN pip install -e . + +COPY tests tests + +ENV AA_VGG_19 /app/pretrained_model_bundle/imagenet-vgg-verydeep-19.mat +ENTRYPOINT [ "neural-style-transfer" ] diff --git a/README.rst b/README.rst index 325b0c5..ea3415c 100644 --- a/README.rst +++ b/README.rst @@ -29,7 +29,7 @@ This Python package runs a Neural Style Tranfer algorithm on input `content` and - | |docker| |image_size| * - code quality - - |better_code_hub| |code_climate| |maintainability| |codacy| |scrutinizer| + - |better_code_hub| |codacy| |code_climate| |maintainability| |scrutinizer| @@ -147,9 +147,12 @@ and the pretrained model are present. That way you can immediately start creatin docker pull boromir674/neural-style-transfer - mkdir nst-output + export NST_OUTPUT=/home/$USER/nst-output - docker run -it --rm -v nst-output:/app/nst-output boromir674/neural-style-transfer + CONTENT=/path/to/content-image.jpg + STYLE=/path/to/style-image.jpg + + docker run -it --rm -v $NST_OUTPUT:/nst-output boromir674/neural-style-transfer $STYLE $CONTENT --iteratins 200 --location /nst-output @@ -161,7 +164,7 @@ and the pretrained model are present. That way you can immediately start creatin .. |codecov| image:: https://codecov.io/gh/boromir674/neural-style-transfer/branch/master/graph/badge.svg?token=3POTVNU0L4 :alt: Codecov - :target: https://codecov.io/gh/boromir674/neural-style-transfer + :target: https://app.codecov.io/gh/boromir674/neural-style-transfer/branch/master @@ -177,9 +180,9 @@ and the pretrained model are present. That way you can immediately start creatin :alt: PyPI - Python Version :target: https://pypi.org/project/artificial-artwork -.. |commits_since| image:: https://img.shields.io/github/commits-since/boromir674/neural-style-transfer/v0.6/master?color=blue&logo=Github +.. |commits_since| image:: https://img.shields.io/github/commits-since/boromir674/neural-style-transfer/v0.6.1/master?color=blue&logo=Github :alt: GitHub commits since tagged version (branch) - :target: https://github.com/boromir674/neural-style-transfer/compare/v0.6..master + :target: https://github.com/boromir674/neural-style-transfer/compare/v0.6.1..master @@ -187,22 +190,18 @@ and the pretrained model are present. That way you can immediately start creatin :alt: Better Code Hub :target: https://bettercodehub.com/ - .. |codacy| image:: https://app.codacy.com/project/badge/Grade/07b27ac547a94708aefc5e845d2b6d01 :alt: Codacy :target: https://www.codacy.com/gh/boromir674/neural-style-transfer/dashboard?utm_source=github.com&utm_medium=referral&utm_content=boromir674/neural-style-transfer&utm_campaign=Badge_Grade - .. |code_climate| image:: https://api.codeclimate.com/v1/badges/2ea98633f88b75e87d1a/maintainability :alt: Maintainability :target: https://codeclimate.com/github/boromir674/neural-style-transfer/maintainability - .. |maintainability| image:: https://img.shields.io/codeclimate/tech-debt/boromir674/neural-style-transfer?logo=CodeClimate :alt: Technical Debt :target: https://codeclimate.com/github/boromir674/neural-style-transfer/maintainability - .. |scrutinizer| image:: https://img.shields.io/scrutinizer/quality/g/boromir674/neural-style-transfer/master?logo=scrutinizer-ci :alt: Scrutinizer code quality :target: https://scrutinizer-ci.com/g/boromir674/neural-style-transfer/?branch=master @@ -213,9 +212,9 @@ and the pretrained model are present. That way you can immediately start creatin :alt: PyPI Package latest release :target: https://pypi.org/project/topic-modeling-toolkit -.. |python_versions| image:: https://img.shields.io/pypi/pyversions/topic-modeling-toolkit.svg +.. |python_versions| image:: https://img.shields.io/pypi/pyversions/artificial-artwork.svg :alt: Supported versions - :target: https://pypi.org/project/topic-modeling-toolkit + :target: https://pypi.org/project/artificial-artwork/ diff --git a/setup.cfg b/setup.cfg index ee468e8..baa1491 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] ## Setuptools specific information name = artificial_artwork -version = 0.6 +version = 0.6.1 description = Create artificial artwork by transfering the appearance of one image (eg a famous painting) to another user-supplied image (eg your favourite photograph). long_description = file: README.rst long_description_content_type = text/x-rst @@ -14,6 +14,7 @@ author_email = k.lampridis@hotmail.com # represents the web home page of the project url = https://github.com/boromir674/neural-style-transfer +download_url = https://github.com/boromir674/neural-style-transfer/archive/v0.6.1.tar.gz ## PyPi specific information project_urls = @@ -78,8 +79,11 @@ skip = .tox,venv,env,env-dev default_section = THIRDPARTY known_first_party = src,tests + [semantic_release] version_variable = src/artificial_artwork/__version__.py:__version__ +version_source=source + [check-manifest] ignore = diff --git a/src/artificial_artwork/__version__.py b/src/artificial_artwork/__version__.py new file mode 100644 index 0000000..8411e55 --- /dev/null +++ b/src/artificial_artwork/__version__.py @@ -0,0 +1 @@ +__version__ = '0.6.1' diff --git a/src/artificial_artwork/algorithm.py b/src/artificial_artwork/algorithm.py index 68af6ca..1168d91 100644 --- a/src/artificial_artwork/algorithm.py +++ b/src/artificial_artwork/algorithm.py @@ -1,7 +1,5 @@ import attr -from .style_layer_selector import NSTLayersSelection - @attr.s class NSTAlgorithm: @@ -10,11 +8,7 @@ class NSTAlgorithm: @attr.s class AlogirthmParameters: - # TODO remove content and style images and output_path - # retain only algorithm parameters (variables governing how the algo will behave) - # from the algo input (runtime objects that are the INPUT to the algo) content_image = attr.ib() style_image = attr.ib() - style_layers = attr.ib(converter=NSTLayersSelection.from_tuples) termination_condition = attr.ib() output_path = attr.ib() diff --git a/src/artificial_artwork/cli.py b/src/artificial_artwork/cli.py index cb71cbd..08833b0 100644 --- a/src/artificial_artwork/cli.py +++ b/src/artificial_artwork/cli.py @@ -1,45 +1,33 @@ +import sys import click +import numpy as np from .disk_operations import Disk from .styling_observer import StylingObserver from .algorithm import NSTAlgorithm, AlogirthmParameters from .nst_tf_algorithm import NSTAlgorithmRunner -from .termination_condition.termination_condition import TerminationConditionFacility -from .termination_condition_adapter import TerminationConditionAdapterFactory -from .nst_image import ImageManager +from .termination_condition_adapter_factory import TerminationConditionAdapterFactory +from .nst_image import ImageManager, noisy, convert_to_uint8 +from .production_networks import NetworkDesign +from .pretrained_model import ModelHandlerFacility -@click.command() -@click.argument('content_image') -@click.argument('style_image') -@click.option('--interactive', '-i', type=bool, default=True) -@click.option('--iterations', '-it', type=int, default=100) -@click.option('--location', '-l', type=str, default='.') -def cli(content_image, style_image, interactive, iterations, location): - - TERMINATION_CONDITION = 'max-iterations' - STYLE_LAYERS = [ - ('conv1_1', 0.2), - ('conv2_1', 0.2), - ('conv3_1', 0.2), - ('conv4_1', 0.2), - ('conv5_1', 0.2), - ] +def load_pretrained_model_functions(): + # future work: discover dynamically the modules inside the pre_trained_model + # package + from .pre_trained_models import vgg + return vgg - import numpy as np - from artificial_artwork.image.image_operations import noisy, reshape_image, subtract, convert_to_uint8 +def read_images(content, style): # todo dynamically find means means = np.array([123.68, 116.779, 103.939]).reshape((1,1,1,3)) # means - image_manager = ImageManager([ - lambda matrix: reshape_image(matrix, ((1,) + matrix.shape)), - lambda matrix: subtract(matrix, means), # input image must have 3 channels! - ]) + image_manager = ImageManager.default(means) # probably load each image in separate thread and then join - image_manager.load_from_disk(content_image, 'content') - image_manager.load_from_disk(style_image, 'style') + image_manager.load_from_disk(content, 'content') + image_manager.load_from_disk(style, 'style') if not image_manager.images_compatible: print("Given CONTENT image '{content_image}' has 'height' x 'width' x " @@ -48,46 +36,56 @@ def cli(content_image, style_image, interactive, iterations, location): f"'color_channels': {image_manager.style_image.matrix.shape}") print('Expected to receive images (matrices) of identical shape') print('Exiting..') - exit(1) + sys.exit(1) - # image_factory = ImageFactory(Disk.load_image) - # content_image = image_factory.from_disk(content_image, reshape_and_normalize_pipeline) - # style_image = image_factory.from_disk(style_image, reshape_and_normalize_pipeline) + return image_manager.content_image, image_manager.style_image - termination_condition = TerminationConditionFacility.create( - TERMINATION_CONDITION, iterations) - termination_condition_adapter = TerminationConditionAdapterFactory.create( - TERMINATION_CONDITION, termination_condition) - print(f' -- Termination Condition: {termination_condition}') + +@click.command() +@click.argument('content_image') +@click.argument('style_image') +@click.option('--iterations', '-it', type=int, default=100, show_default=True) +@click.option('--location', '-l', type=str, default='.') +def cli(content_image, style_image, iterations, location): + + termination_condition = 'max-iterations' + + content_image, style_image = read_images(content_image, style_image) + + load_pretrained_model_functions() + model_design = type('ModelDesign', (), { + 'pretrained_model': ModelHandlerFacility.create('vgg'), + 'network_design': NetworkDesign.from_default_vgg() + }) + model_design.pretrained_model.load_model_layers() + + termination_condition = TerminationConditionAdapterFactory.create( + termination_condition, + iterations, + ) + + print(f' -- Termination Condition: {termination_condition.termination_condition}') algorithm_parameters = AlogirthmParameters( - image_manager.content_image, - image_manager.style_image, - STYLE_LAYERS, - termination_condition_adapter, + content_image, + style_image, + termination_condition, location, ) algorithm = NSTAlgorithm(algorithm_parameters) - # algorithm_runner = NSTAlgorithmRunner.default( - # algorithm, - # image_factory.image_processor.noisy - # ) - noisy_ratio = 0.6 # ratio - # NEW algorithm_runner = NSTAlgorithmRunner.default( - algorithm, - lambda matrix: noisy(matrix, noisy_ratio) + lambda matrix: noisy(matrix, noisy_ratio), ) algorithm_runner.progress_subject.add( - termination_condition_adapter, + termination_condition, ) - algorithm_runner.peristance_subject.add( + algorithm_runner.persistance_subject.add( StylingObserver(Disk.save_image, convert_to_uint8) ) - algorithm_runner.run() + algorithm_runner.run(algorithm, model_design) diff --git a/src/artificial_artwork/cost_computer.py b/src/artificial_artwork/cost_computer.py index 4e7c240..8474f13 100644 --- a/src/artificial_artwork/cost_computer.py +++ b/src/artificial_artwork/cost_computer.py @@ -2,48 +2,19 @@ from .nst_math import gram_matrix -class NSTCostComputer: - @classmethod - def compute(cls, J_content: float, J_style: float, alpha: float=10, beta: float=40) -> float: - """Compute the total cost function. - - Computes the total cost (aka learning error) as a linear combination of - the 'content cost' and 'style cost'. - - Total cost = alpha * J_content + beta * J_style - - Or mathematically expressed as: - - J(G) = alpha * J_content(C, G) + beta * J_style(S, G) - - where G: Generated Image, C: Content Image, S: Style Image - and J, J_content, J_style are mathematical functions - - Args: - J_content (float): content cost - J_style (float): style cost - alpha (float, optional): hyperparameter to weight content cost. Defaults to 10. - beta (float, optional): hyperparameter to weight style cost. Defaults to 40. - - Returns: - float: the total cost as defined by the formula above - """ - return alpha * J_content + beta * J_style - - class NSTContentCostComputer: @classmethod def compute(cls, a_C, a_G): """ Computes the content cost - + Assumption 1: a layer l has been chosen from a (Deep) Neural Network trained on images, that should act as a style model. Then: 1. a_C (3D volume) are the hidden layer activations in the chosen layer (l), when the C image is forward propagated (passed through) in the network. - + 2. a_G (3D volume) are the hidden layer activations in the chosen layer (l), when the G image is forward propagated (passed through) in the network. @@ -52,7 +23,7 @@ def compute(cls, a_C, a_G): Pseudo code for latex expression of the mathematical equation: - J_content(C, G) = \frac{1}{4 * n_H * n_W * n_C} * \sum_{all entries} (a^(C) - a^(G))^2 + J_content(C, G) = \\frac{1}{4 * n_H * n_W * n_C} * \\sum_{all entries} (a^(C) - a^(G))^2 OR J_content(C, G) = sum_{for all entries} (a^(C) - a^(G))^2 / (4 * n_H * n_W * n_C) @@ -61,30 +32,24 @@ def compute(cls, a_C, a_G): Args: a_C (tensor): of dimension (1, n_H, n_W, n_C), hidden layer activations representing content of the image C a_G (tensor): of dimension (1, n_H, n_W, n_C), hidden layer activations representing content of the image G - - Returns: + + Returns: (tensor): 1D with 1 scalar value computed using the equation above """ - # Retrieve dimensions from a_G - m, n_H, n_W, n_C = a_G.get_shape().as_list() - - # Reshape a_C and a_G - # a_C_unrolled = tf.reshape(a_C, [m, n_H * n_W, n_C]) - # a_G_unrolled = tf.reshape(a_G, [m, n_H * n_W, n_C]) - - # compute the cost - J_content = tf.reduce_sum(tf.square(a_C - a_G)) / (4 * n_H * n_W * n_C) - return J_content + # Dimensions of a_G (we ommit the first one, which equals to 1) + n_H, n_W, n_C = a_G.get_shape().as_list()[1:] + # Future work: Investigate performance when reshaping a_C and a_G before + # computing J_content + # a_C_unrolled = tf.reshape(a_C, [first_dim, n_H * n_W, n_C]) + # a_G_unrolled = tf.reshape(a_G, [first_dim, n_H * n_W, n_C]) -class GramMatrixComputer(type): - def __new__(mcs, *args, **kwargs): - class_object = super().__new__(mcs, *args, **kwargs) - class_object.compute_gram = gram_matrix - return class_object + J_content = tf.reduce_sum(tf.square(a_C - a_G)) / (4 * n_H * n_W * n_C) + return J_content -class NSTLayerStyleCostComputer(metaclass=GramMatrixComputer): +class NSTLayerStyleCostComputer: + compute_gram = gram_matrix @classmethod def compute(cls, a_S, a_G): @@ -92,22 +57,24 @@ def compute(cls, a_S, a_G): Compute the Style Cost, using the activations of the l style layer. Mathematical equation written in Latex code: - J^{[l]}_style (S, G) = \frac{1}{4 * n_c^2 * (n_H * n_W)^2} \sum^{n_C}_{i=1} \sum^{c_C}_{j=1} (G^{(S)}_{(gram)i,j} - G^{(G)}_{(gram)i,j})^2 - + J^{[l]}_style (S, G) = \\frac{1}{4 * n_c^2 * (n_H * n_W)^2} + \\sum^{n_C}_{i=1} \\sum^{c_C}_{j=1} (G^{(S)}_{(gram)i,j} - G^{(G)}_{(gram)i,j})^2 + OR - Cost(S, G) = \sum^{n_C}_{i=1} \sum^{c_C}_{j=1} (G^{(S)}_{(gram)i,j} - G^{(G)}_{(gram)i,j})^2 / ( 4 * n_c^2 * (n_H * n_W)^2 ) - + Cost(S, G) = \\sum^{n_C}_{i=1} \\sum^{c_C}_{j=1} + (G^{(S)}_{(gram)i,j} - G^{(G)}_{(gram)i,j})^2 / ( 4 * n_c^2 * (n_H * n_W)^2 ) + Args: - a_S (tensor): tensor of dimension (1, n_H, n_W, n_C), hidden layer activations representing style of the image S - a_G (tensor): tensor of dimension (1, n_H, n_W, n_C), hidden layer activations representing style of the image G - - Returns: + a_S (tensor): hidden layer activations of input image S representing style; shape is (1, n_H, n_W, n_C) + a_G (tensor): hidden layer activations of input image G representing style; shape is (1, n_H, n_W, n_C) + + Returns: (tensor): J_style_layer tensor representing a scalar value, style cost defined above by equation (2) """ - # Retrieve dimensions from a_G - m, n_H, n_W, n_C = a_G.get_shape().as_list() - + # Dimensions of a_G (we ommit the first one, which equals to 1) + n_H, n_W, n_C = a_G.get_shape().as_list()[1:] + # Reshape the images to have them of shape (n_C, n_H*n_W) a_S = tf.transpose(tf.reshape(a_S, [n_H * n_W, n_C])) a_G = tf.transpose(tf.reshape(a_G, [n_H * n_W, n_C])) @@ -118,7 +85,7 @@ def compute(cls, a_S, a_G): # Computing the loss J_style_layer = tf.reduce_sum(tf.square(GS - GG)) / ( 4 * n_C**2 * (n_H * n_W)**2) - + return J_style_layer @@ -129,35 +96,34 @@ class NSTStyleCostComputer: def compute(cls, tf_session, model_layers): """ Computes the overall style cost from several chosen layers - + Args: tf_session (tf.compat.v1.INteractiveSession): the active interactive tf session model_layers () -- our image model (probably pretrained on large dataset) STYLE_LAYERS -- A python list containing: - the names of the layers we would like to extract style from - a coefficient for each of them - - Returns: + + Returns: (tensor): J_style - tensor representing a scalar value, style cost defined above by equation (2) """ # initialize the overall style cost J_style = 0 # for layer_name, coeff in STYLE_LAYERS: - for style_layer_id, nst_style_layer in model_layers: + for _style_layer_id, nst_style_layer in model_layers: # Select the output tensor of the currently selected layer out = nst_style_layer.neurons - # out = model[layer_name] # Set a_S to be the hidden layer activation from the layer we have selected, by running the session on out a_S = tf_session.run(out) - # Set a_G to be the hidden layer activation from same layer. Here, a_G references model[layer_name] + # Set a_G to be the hidden layer activation from same layer. Here, a_G references model[layer_name] # and isn't evaluated yet. Later in the code, we'll assign the image G as the model input, so that # when we run the session, this will be the activations drawn from the appropriate layer, with G as input. a_G = out - + # Compute style_cost for the current layer J_style_layer = cls.style_layer_cost(a_S, a_G) diff --git a/src/artificial_artwork/disk_interface.py b/src/artificial_artwork/disk_interface.py index 550d613..3dd8acb 100644 --- a/src/artificial_artwork/disk_interface.py +++ b/src/artificial_artwork/disk_interface.py @@ -6,7 +6,7 @@ class DiskInterface(ABC): @staticmethod @abstractmethod - def save_image(image: NDArray, file_path: str, format=None) -> None: + def save_image(image: NDArray, file_path: str, save_format=None) -> None: raise NotImplementedError @staticmethod diff --git a/src/artificial_artwork/disk_operations.py b/src/artificial_artwork/disk_operations.py index 27d7622..2e9936f 100644 --- a/src/artificial_artwork/disk_operations.py +++ b/src/artificial_artwork/disk_operations.py @@ -6,16 +6,16 @@ class Disk(DiskInterface): """Save or load images to and from the disk.""" - + @staticmethod - def save_image(image: NDArray, file_path: str, format=None) -> None: + def save_image(image: NDArray, file_path: str, save_format=None) -> None: """Save a numpy ndarray into a file on the disk. Args: image (NDArray): the image to save into a file file_path (str): the path (on the disk) of the file """ - imageio.imsave(file_path, image, format=format) + imageio.imsave(file_path, image, format=save_format) @staticmethod def load_image(file_path: str) -> NDArray: diff --git a/src/artificial_artwork/image/__init__.py b/src/artificial_artwork/image/__init__.py index b89698c..e84ae6d 100644 --- a/src/artificial_artwork/image/__init__.py +++ b/src/artificial_artwork/image/__init__.py @@ -1 +1,6 @@ from .image_factory import ImageFactory +from .image_operations import reshape_image, subtract, noisy, convert_to_uint8 + + +__all__ = ['ImageFactory', 'reshape_image', 'subtract', 'noisy', + 'convert_to_uint8'] diff --git a/src/artificial_artwork/image/image.py b/src/artificial_artwork/image/image.py index 4350e2a..62875d0 100644 --- a/src/artificial_artwork/image/image.py +++ b/src/artificial_artwork/image/image.py @@ -5,12 +5,12 @@ @attr.s class Image: """An image loaded into memory, represented as a multidimension mathematical matrix/array. - + The 'file_path' attribute indicates a file in the disk that corresponds to the matrix Args: - file_path (str): - matrix (NDArray): + file_path (str): the file in disk that the image was loaded from + matrix (NDArray): the loaded image as mathmatical array/matrix """ file_path: str = attr.ib() matrix: NDArray = attr.ib(default=None) diff --git a/src/artificial_artwork/image/image_factory.py b/src/artificial_artwork/image/image_factory.py index 5928264..ab177e7 100644 --- a/src/artificial_artwork/image/image_factory.py +++ b/src/artificial_artwork/image/image_factory.py @@ -1,5 +1,5 @@ +from typing import Protocol, Callable, List import attr -from typing import Protocol, Any, Callable, List from numpy.typing import NDArray from .image_processor import ImageProcessor diff --git a/src/artificial_artwork/image/image_operations.py b/src/artificial_artwork/image/image_operations.py index a1e89d7..68115d9 100644 --- a/src/artificial_artwork/image/image_operations.py +++ b/src/artificial_artwork/image/image_operations.py @@ -1,6 +1,9 @@ +from typing import Tuple import numpy as np from numpy.typing import NDArray -from typing import Tuple + + +__all__ = ['reshape_image', 'subtract', 'noisy', 'convert_to_uint8'] def reshape_image(image: NDArray, shape: Tuple[int, ...]) -> NDArray: @@ -18,12 +21,12 @@ def subtract(image: NDArray, array: NDArray) -> NDArray: Returns: NDArray: [description] - """ + """ try: return image - array - except ValueError as numpy_broadcast_error: + except ValueError as numpy_broadcast_error: raise ShapeMissmatchError( - f'Expected arrays with matching shapes.') from numpy_broadcast_error + 'Expected arrays with matching shapes.') from numpy_broadcast_error class ShapeMissmatchError(Exception): pass @@ -32,12 +35,9 @@ class ShapeMissmatchError(Exception): pass def noisy(image: NDArray, ratio: float) -> NDArray: """Generates a noisy image by adding random noise to the content_image""" if ratio < 0 or 1 < ratio: - raise InvalidRatioError('Expected a ratio value x such that 0 <= x <= 1') - - prod_shape = image.shape - # assert prod_shape == (1, self.config.image_height, self.config.image_width, self.config.color_channels) + raise InvalidRatioError('Expected a ratio value x such that 0 <= x <= 1') - noise_image = np.random.uniform(-20, 20, prod_shape).astype('float32') + noise_image = np.random.uniform(-20, 20, image.shape).astype('float32') # Set the input_image to be a weighted average of the content_image and a noise_image return noise_image * ratio + image * (1 - ratio) @@ -53,26 +53,23 @@ class ImageDTypeConverter: def __call__(self, image: NDArray): return self._convert_to_uint8(image) - def _convert_to_uint8(self, im): + def _convert_to_uint8(self, image): bitdepth = 8 out_type = type(self).bit_2_data_type[bitdepth] - mi = np.nanmin(im) - ma = np.nanmax(im) - if not np.isfinite(mi): + min_pixel_value = np.nanmin(image) + max_pixel_value = np.nanmax(image) + if not np.isfinite(min_pixel_value): raise ValueError("Minimum image value is not finite") - if not np.isfinite(ma): + if not np.isfinite(max_pixel_value): raise ValueError("Maximum image value is not finite") - if ma == mi: - return im.astype(out_type) + if max_pixel_value == min_pixel_value: + return image.astype(out_type) # Make float copy before we scale - im = im.astype("float64") + im = image.astype("float64") # Scale the values between 0 and 1 then multiply by the max value - im = (im - mi) / (ma - mi) * (np.power(2.0, bitdepth) - 1) + 0.499999999 - assert np.nanmin(im) >= 0 - assert np.nanmax(im) < np.power(2.0, bitdepth) - im = im.astype(out_type) - return im + im = (im - min_pixel_value) / (max_pixel_value - min_pixel_value) * (np.power(2.0, bitdepth) - 1) + 0.499999999 + return im.astype(out_type) convert_to_uint8 = ImageDTypeConverter() diff --git a/src/artificial_artwork/nst_image.py b/src/artificial_artwork/nst_image.py index a7d681a..c649415 100644 --- a/src/artificial_artwork/nst_image.py +++ b/src/artificial_artwork/nst_image.py @@ -1,8 +1,18 @@ +from typing import Protocol import attr -from .image import ImageFactory +from numpy.typing import NDArray + +from .image import ImageFactory, reshape_image, subtract, noisy, convert_to_uint8 from .disk_operations import Disk +__all__ = ['ImageManager', 'noisy', 'convert_to_uint8'] + + +class ImageProtocol(Protocol): + path: str + matrix: NDArray + @attr.s class ImageManager: @@ -10,9 +20,19 @@ class ImageManager: image_factory: ImageFactory = \ attr.ib(init=False, default=attr.Factory(lambda: ImageFactory(Disk.load_image))) images_compatible: bool = attr.ib(init=False, default=False) - + _known_types = attr.ib(init=False, default={'content', 'style'}) + _content_image: ImageProtocol + _style_image: ImageProtocol + + @staticmethod + def default(means): + return ImageManager([ + lambda matrix: reshape_image(matrix, ((1,) + matrix.shape)), + lambda matrix: subtract(matrix, means), # input image must have 3 channels! + ]) + def __attrs_post_init__(self): for image_type in self._known_types: setattr(self, f'_{image_type}_image', None) @@ -35,17 +55,17 @@ def _set_image(self, image, image_type: str): self.images_compatible = False @property - def content_image(self): + def content_image(self) -> ImageProtocol: return self._content_image @content_image.setter - def content_image(self, image): + def content_image(self, image: ImageProtocol) -> None: self._set_image(image, 'content') - + @property - def style_image(self): + def style_image(self) -> ImageProtocol: return self._style_image @style_image.setter - def style_image(self, image): + def style_image(self, image: ImageProtocol) -> None: self._set_image(image, 'style') diff --git a/src/artificial_artwork/nst_math.py b/src/artificial_artwork/nst_math.py index 427aa9b..b79e2b5 100644 --- a/src/artificial_artwork/nst_math.py +++ b/src/artificial_artwork/nst_math.py @@ -1,18 +1,18 @@ -from typing import Union -from numpy.typing import NDArray +from typing import TypeVar import tensorflow as tf -# Define type alias -VolumeType = Union[NDArray, tf.python.framework.ops.Tensor] +# future work: narrow down the type pf matrix argument +# VolumeType = Union[NDArray, Type[tf.python.framework.ops.Tensor]] +VolumeType = TypeVar('VolumeType') -def gram_matrix(A: VolumeType) -> VolumeType: - """Compute the Gram matrix of input 2D matrix A. +def gram_matrix(matrix: VolumeType) -> VolumeType: + """Compute the Gram matrix of input 2D matrix. In Linear Algebra the Gram matrix G of a set of vectors (u_1, u_2, .. , u_n) is the matrix of dot products, whose entries are: - + G_{ij} = u^T_i * u_j = numpy.dot(u_i, u_j) OR GA = A * A^T @@ -20,9 +20,9 @@ def gram_matrix(A: VolumeType) -> VolumeType: Uses tenforflow to compute the Gram matrix of the input 2D matrix. Args: - A (type): matrix of shape (n_C, n_H * n_W) - + matrix (type): matrix of shape (n_C, n_H * n_W) + Returns: (tf.tensor): Gram matrix of A, of shape (n_C, n_C) """ - return tf.matmul(A, tf.transpose(A)) + return tf.matmul(matrix, tf.transpose(matrix)) diff --git a/src/artificial_artwork/nst_tf_algorithm.py b/src/artificial_artwork/nst_tf_algorithm.py index 55b270c..fea0cfc 100644 --- a/src/artificial_artwork/nst_tf_algorithm.py +++ b/src/artificial_artwork/nst_tf_algorithm.py @@ -1,38 +1,44 @@ +from time import time from typing import Dict import attr import tensorflow as tf -from time import time - from .tf_session_runner import TensorflowSessionRunner -from .pretrained_model import graph_factory -from .cost_computer import NSTCostComputer, NSTContentCostComputer, NSTStyleCostComputer -from .utils.notification import Subject +from .style_model import graph_factory +from .cost_computer import NSTContentCostComputer, NSTStyleCostComputer +from .utils import Subject @attr.s class NSTAlgorithmRunner: - nst_algorithm = attr.ib() session_runner = attr.ib() apply_noise = attr.ib() + # model_design = attr.ib() optimization = attr.ib(default=attr.Factory(lambda: Optimization())) + nst_algorithm = attr.ib(init=False, default=None) + parameters = attr.ib(init=False, default=None) + nn_builder = attr.ib(init=False, default=None) nn_cost_builder = attr.ib(init=False, default=None) # broadcast facilities to notify observers/listeners progress_subject = attr.ib(init=False, default=attr.Factory(Subject)) - peristance_subject = attr.ib(init=False, default=attr.Factory(Subject)) + persistance_subject = attr.ib(init=False, default=attr.Factory(Subject)) + + # NETWORK_OUTPUT = 'conv4_2' @classmethod - def default(cls, nst_algorithm, apply_noise): + def default(cls, apply_noise): session_runner = TensorflowSessionRunner.with_default_graph_reset() - return NSTAlgorithmRunner(nst_algorithm, session_runner, apply_noise) + return NSTAlgorithmRunner(session_runner, apply_noise) - def run(self): + def run(self, nst_algorithm, model_design): ## Prepare ## - c_image = self.nst_algorithm.parameters.content_image - s_image = self.nst_algorithm.parameters.style_image + self.nst_algorithm = nst_algorithm + + c_image = nst_algorithm.parameters.content_image + s_image = nst_algorithm.parameters.style_image image_specs = type('ImageSpecs', (), { 'height': c_image.matrix.shape[1], @@ -41,7 +47,8 @@ def run(self): })() print(' --- Loading CV Image Model ---') - style_network = graph_factory.create(image_specs) + + style_network = graph_factory.create(image_specs, model_design) noisy_content_image_matrix = self.apply_noise(self.nst_algorithm.parameters.content_image.matrix) @@ -53,10 +60,10 @@ def run(self): ) # indicate content_image and the output layer of the Neural Network - self.nn_builder.build_activations(c_image.matrix, 'conv4_2') + self.nn_builder.build_activations( + c_image.matrix, model_design.network_design.output_layer) self.nn_cost_builder = CostBuilder( - NSTCostComputer.compute, NSTContentCostComputer.compute, NSTStyleCostComputer.compute, ) @@ -67,17 +74,17 @@ def run(self): ) self.nn_builder.assign_input(s_image.matrix) - + # manually set the neurons attribute for each NSTStyleLayer # using the loaded cv model (which is a dict of layers) # the NSTStyleLayer ids attribute to query the dict - for style_layer_id, nst_style_layer in self.nst_algorithm.parameters.style_layers: + for style_layer_id, nst_style_layer in model_design.network_design.style_layers: nst_style_layer.neurons = style_network[style_layer_id] # TODO obviously encapsulate the above code elsewhere self.nn_cost_builder.build_style_cost( self.session_runner.session, - self.nst_algorithm.parameters.style_layers, + model_design.network_design.style_layers, ) self.nn_cost_builder.build_cost(alpha=10, beta=40) @@ -89,14 +96,15 @@ def run(self): print(' --- Preparing Iterative Learning Algorithm ---') input_image = noisy_content_image_matrix - + # Initialize global variables (you need to run the session on the initializer) self.session_runner.run(tf.compat.v1.global_variables_initializer()) # Run the noisy input image (initial generated image) through the model self.session_runner.run(style_network['input'].assign(input_image)) + self.perform_nst(style_network) - # Iterate + def perform_nst(self, style_network): print(' --- Running Iterative Algorithm ---') i = 0 @@ -118,7 +126,7 @@ def run(self): 'content-cost': Jc, 'style-cost': Js, }) - progress['metrics']['duration'] = time() - self.time_started, # in seconds + progress['metrics']['duration'] = time() - self.time_started # in seconds self._notify_progress(progress) i += 1 @@ -137,7 +145,7 @@ def run(self): def iterate(self, image_model): # Run the session on the train_step to minimize the total cost self.session_runner.run([self.optimization.train_step]) - + # Compute the generated image by running the session on the current model['input'] generated_image = self.session_runner.run(image_model['input']) return generated_image @@ -154,8 +162,8 @@ def _progress(self, generated_image, completed_iterations: int) -> Dict: } def _notify_persistance(self, progress): - self.peristance_subject.state = type('SubjectState', (), progress) - self.peristance_subject.notify() + self.persistance_subject.state = type('SubjectState', (), progress) + self.persistance_subject.notify() def _notify_progress(self, progress): # set subject with the appropriate state to broadcast @@ -194,8 +202,8 @@ class NeuralNetBuilder: a_G = attr.ib(init=False, default=None) def assign_input(self, image): - # Assign the content image to be the input of the VGG model. - self.session.run(self.model['input'].assign(image)) + # Assign the content image to be the input of the VGG model. + self.session.run(self.model['input'].assign(image)) def build_activations(self, content_image, model_layer_id: str): self.assign_input(content_image) @@ -210,7 +218,7 @@ def _setup_activations(self): # Set a_C to be the hidden layer activation from the layer we have selected self.a_C = self.session.run(self.output_neurons) - # Set a_G to be the hidden layer activation from same layer. Here, a_G references model['conv4_2'] + # Set a_G to be the hidden layer activation from same layer. Here, a_G references model['conv4_2'] # and isn't evaluated yet. Later in the code, we'll assign the image G as the model input, so that # when we run the session, this will be the activations drawn from the appropriate layer, with G as input. self.a_G = self.output_neurons @@ -218,7 +226,6 @@ def _setup_activations(self): @attr.s class CostBuilder: - cost_function = attr.ib() compute_content_cost = attr.ib() compute_style_cost = attr.ib() @@ -237,7 +244,28 @@ def build_style_cost(self, tf_session, style_layers): self.style_cost = self.compute_style_cost(tf_session, style_layers) def build_cost(self, alpha=10, beta=40): - self.cost = self.cost_function(self.content_cost, self.style_cost, alpha=alpha, beta=beta) + """Build the function of the Total Cost (loss function). + + The Total Cost function J(G) (learning error) is the linear combination + of the 'content cost' (J_content) and 'style cost' (J_style). + + After invoking this method the Cost Function is accessible via the + 'cost' attribute. + + Total cost = alpha * J_content + beta * J_style + + Or mathematically expressed as: + + J(G) = alpha * J_content(C, G) + beta * J_style(S, G) + + where G: Generated Image, C: Content Image, S: Style Image + and J, J_content, J_style are mathematical functions + + Args: + alpha (float, optional): hyperparameter to weight content cost. Defaults to 10. + beta (float, optional): hyperparameter to weight style cost. Defaults to 40. + """ + self.cost = alpha * self.content_cost + beta * self.style_cost @attr.s diff --git a/src/artificial_artwork/pre_trained_models/__init__.py b/src/artificial_artwork/pre_trained_models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/artificial_artwork/pre_trained_models/vgg.py b/src/artificial_artwork/pre_trained_models/vgg.py new file mode 100644 index 0000000..46c5a1a --- /dev/null +++ b/src/artificial_artwork/pre_trained_models/vgg.py @@ -0,0 +1,44 @@ +from typing import Tuple, Dict +from numpy.typing import NDArray +import scipy.io + + +from artificial_artwork.pretrained_model.model_routines import PretrainedModelRoutines +from artificial_artwork.pretrained_model import ModelHandlerFacility, Modelhandler + + +class VggModelRoutines(PretrainedModelRoutines): + + def load_layers(self, file_path: str) -> NDArray: + return scipy.io.loadmat(file_path)['layers'][0] + + def get_id(self, layer: NDArray) -> str: + return layer[0][0][0][0] + + def get_layers_dict(self, layers: NDArray) -> Dict[str, NDArray]: + return {self.get_id(layer): layers[index] for index, layer in enumerate(layers)} + + def get_weights(self, layer: NDArray) -> Tuple[NDArray, NDArray]: + return layer[0][0][2][0][0], layer[0][0][2][0][1] + + +vgg_model_routines = VggModelRoutines() + + +@ModelHandlerFacility.factory.register_as_subclass('vgg') +class VggModelHandler(Modelhandler): + + @property + def environment_variable(self) -> str: + return 'AA_VGG_19' + + @property + def model_routines(self) -> VggModelRoutines: + return vgg_model_routines + + @property + def model_load_exception_text(self) -> str: + return 'No pretrained image model found. ' \ + f'Please download it and set the {self.environment_variable} ' \ + 'environment variable with the path where you stored the model ' \ + '(*.mat file), to instruct the program where to locate and load it' diff --git a/src/artificial_artwork/pretrained_model/__init__.py b/src/artificial_artwork/pretrained_model/__init__.py index c221c94..13c3c7f 100644 --- a/src/artificial_artwork/pretrained_model/__init__.py +++ b/src/artificial_artwork/pretrained_model/__init__.py @@ -1 +1,4 @@ -from .model_loader import GraphFactory as graph_factory +from .model_handler import ModelHandlerFacility, Modelhandler + + +__all__ = ['ModelHandlerFacility', 'Modelhandler'] diff --git a/src/artificial_artwork/pretrained_model/layers_getter.py b/src/artificial_artwork/pretrained_model/layers_getter.py index fd30021..41604cd 100644 --- a/src/artificial_artwork/pretrained_model/layers_getter.py +++ b/src/artificial_artwork/pretrained_model/layers_getter.py @@ -1,15 +1,18 @@ +from typing import Callable, Tuple, Dict from numpy.typing import NDArray -import attr +from attr import define -from .vgg_architecture import LAYERS +@define +class ModelReporter: + _layer_id_2_layer: Dict[str, NDArray] + _weights_extractor: Callable[[NDArray], Tuple[NDArray, NDArray]] -@attr.s -class VggLayersGetter: - vgg_layers: NDArray = attr.ib() - _vgg_layer_id_2_layer = attr.ib(init=False, - default=attr.Factory(lambda self: {layer_id: self.vgg_layers[index] for index, layer_id in enumerate(LAYERS)}, takes_self=True)) - - @property - def id_2_layer(self): - return self._vgg_layer_id_2_layer + def layer(self, layer_id: str) -> NDArray: + return self._layer_id_2_layer[layer_id] + + def weights(self, layer: NDArray) -> Tuple[NDArray, NDArray]: + return self._weights_extractor(layer) + + def get_weights(self, layer_id: str) -> Tuple[NDArray, NDArray]: + return self.weights(self.layer(layer_id)) diff --git a/src/artificial_artwork/pretrained_model/model_handler.py b/src/artificial_artwork/pretrained_model/model_handler.py new file mode 100644 index 0000000..93bff35 --- /dev/null +++ b/src/artificial_artwork/pretrained_model/model_handler.py @@ -0,0 +1,62 @@ +import os +from typing import Tuple, Protocol +from numpy.typing import NDArray + +from artificial_artwork.utils import SubclassRegistry +from .model_handler_interface import ModelHandlerInterface +from .layers_getter import ModelReporter + + +class ReporterProtocol(Protocol): + def get_weights(self, layer_id: str) -> Tuple[NDArray, NDArray]: ... + + +class Modelhandler(ModelHandlerInterface): + _reporter: ReporterProtocol + def __init__(self): + self._reporter = None + + @property + def reporter(self) -> ReporterProtocol: + return self._reporter + + @reporter.setter + def reporter(self, layers) -> None: + self._reporter = self._create_reporter(layers) + + def _create_reporter(self, layers: NDArray) -> ReporterProtocol: + return ModelReporter( + self.model_routines.get_layers_dict(layers), + self.model_routines.get_weights + ) + + def load_model_layers(self) -> NDArray: + layers = self._load_model_layers() + self._reporter = self._create_reporter(layers) + return layers + + def _load_model_layers(self) -> NDArray: + try: + return self.model_routines.load_layers(os.environ[self.environment_variable]) + except KeyError as variable_not_found: + raise NoImageModelSpesifiedError(self.model_load_exception_text) \ + from variable_not_found + + +class NoImageModelSpesifiedError(Exception): pass + + +class ModelHandlerFactoryMeta(SubclassRegistry[Modelhandler]): pass + + +class ModelHandlerFactory(metaclass=ModelHandlerFactoryMeta): pass + + +class ModelHandlerFacility: + # routines_interface: type = PretrainedModelRoutines + handler_class: type = Modelhandler + factory = ModelHandlerFactory + + @classmethod + def create(cls, handler_type, *args, **kwargs) -> Modelhandler: + return cls.factory.create(handler_type, *args, **kwargs) diff --git a/src/artificial_artwork/pretrained_model/model_handler_interface.py b/src/artificial_artwork/pretrained_model/model_handler_interface.py new file mode 100644 index 0000000..0f8807b --- /dev/null +++ b/src/artificial_artwork/pretrained_model/model_handler_interface.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from typing import Dict, Protocol, Tuple +from numpy.typing import NDArray + + +class PretrainedModelRoutinesCapable(Protocol): + def load_layers(self, file_path: str) -> NDArray: ... + + def get_id(self, layer: NDArray) -> str: ... + + def get_layers_dict(self, layers: NDArray) -> Dict[str, NDArray]: ... + + def get_weights(self, layer: NDArray) -> Tuple[NDArray, NDArray]: ... + + +class ModelHandlerInterface(ABC): + + @property + @abstractmethod + def environment_variable(self) -> str: + raise NotImplementedError + + @property + @abstractmethod + def model_routines(self) -> PretrainedModelRoutinesCapable: + raise NotImplementedError + + @property + @abstractmethod + def model_load_exception_text(self) -> str: + raise NotImplementedError \ No newline at end of file diff --git a/src/artificial_artwork/pretrained_model/model_loader.py b/src/artificial_artwork/pretrained_model/model_loader.py deleted file mode 100644 index 7b1691a..0000000 --- a/src/artificial_artwork/pretrained_model/model_loader.py +++ /dev/null @@ -1,146 +0,0 @@ -### Part of this code is due to the MatConvNet team and is used to load the parameters of the pretrained VGG19 model in the notebook ### - -import os -import re -from typing import Dict, Tuple, Any, Protocol - -import attr -from numpy.typing import NDArray -import numpy as np -import scipy.io -import tensorflow as tf - -from .layers_getter import VggLayersGetter -from .image_model import LAYERS as NETWORK_DESIGN - - -class ImageSpecs(Protocol): - width: int - height: int - color_channels: int - - -def load_vgg_model_parameters(path: str) -> Dict[str, NDArray]: - return scipy.io.loadmat(path) - - -class NoImageModelSpesifiedError(Exception): pass - - -def get_vgg_19_model_path(): - try: - return os.environ['AA_VGG_19'] - except KeyError as variable_not_found: - raise NoImageModelSpesifiedError('No pretrained image model found. ' - 'Please download it and set the AA_VGG_19 environment variable with the' - 'path where ou stored the model (*.mat file), to indicate to wher to ' - 'locate and load it') from variable_not_found - - -def load_default_model_parameters(): - path = get_vgg_19_model_path() - return load_vgg_model_parameters(path) - - -def get_layers(model_parameters: Dict[str, NDArray]) -> NDArray: - return model_parameters['layers'][0] - - -class GraphBuilder: - - def __init__(self): - self.graph = {} - self._prev_layer = None - - def _build_layer(self, layer_id: str, layer): - self.graph[layer_id] = layer - self._prev_layer = layer - return self - - def input(self, width: int, height: int, nb_channels=3, dtype='float32', layer_id='input'): - self.graph = {} - return self._build_layer(layer_id, tf.Variable(np.zeros((1, height, width, nb_channels)), dtype=dtype)) - - def avg_pool(self, layer_id: str): - return self._build_layer(layer_id, tf.nn.avg_pool(self._prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')) - - def relu_conv_2d(self, layer_id: str, layer_weights): - """A Relu wrapped around a convolutional layer. - - Will use the layer_id to find weight (for W and b matrices) values in - the pretrained model (layer). - - Also uses the layer_id to as dict key to the output graph. - """ - W, b = layer_weights - return self._build_layer(layer_id, tf.nn.relu(self._conv_2d(W, b))) - - def _conv_2d(self, W: NDArray, b: NDArray): - W = tf.constant(W) - b = tf.constant(np.reshape(b, (b.size))) - return tf.compat.v1.nn.conv2d(self._prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b - - -@attr.s -class ModelParameters: - params = attr.ib(default=attr.Factory(load_default_model_parameters)) - - -class GraphFactory: - builder = GraphBuilder() - - @classmethod - def weights(cls, layer: NDArray) -> Tuple[NDArray, NDArray]: - """Get the weights and bias for a given layer of the VGG model.""" - # wb = vgg_layers[0][layer][0][0][2] - wb = layer[0][0][2] - W = wb[0][0] - b = wb[0][1] - return W, b - - @classmethod - def create(cls, config: ImageSpecs, model_parameters=None) -> Dict[str, Any]: - """Create a model for the purpose of 'painting'/generating a picture. - - Creates a Deep Learning Neural Network with most layers having weights - (aka model parameters) with values extracted from a pre-trained model - (ie another neural network trained on an image dataset suitably). - - Args: - config ([type]): [description] - model_parameters ([type], optional): [description]. Defaults to None. - - Returns: - Dict[str, Any]: [description] - """ - - vgg_model_parameters = ModelParameters(*list(filter(None, [model_parameters]))) - - vgg_layers = get_layers(vgg_model_parameters.params) - - layer_getter = VggLayersGetter(vgg_layers) - - def relu(layer_id: str): - return cls.builder.relu_conv_2d(layer_id, cls.weights(layer_getter.id_2_layer[layer_id])) - - layer_callbacks = { - 'conv': relu, - 'avgpool': cls.builder.avg_pool - } - - def layer(layer_id: str): - matched_string = re.match(r'(\w+?)[\d_]*$', layer_id).group(1) - return layer_callbacks[matched_string](layer_id) - - ## Build Graph - - # each relu_conv_2d uses pretrained model's layer weights for W and b matrices - # each average pooling layer uses custom weight values - # all weights are guaranteed to remain constant (see GraphBuilder._conv_2d method) - - # cls.builder.input(config.image_width, config.image_height, nb_channels=config.color_channels) - cls.builder.input(config.width, config.height, nb_channels=config.color_channels) - for layer_id in NETWORK_DESIGN: - layer(layer_id) - - return cls.builder.graph diff --git a/src/artificial_artwork/pretrained_model/model_routines.py b/src/artificial_artwork/pretrained_model/model_routines.py new file mode 100644 index 0000000..dabcffa --- /dev/null +++ b/src/artificial_artwork/pretrained_model/model_routines.py @@ -0,0 +1,69 @@ +"""This modules defines the interface which must be implemented in order to +utilize a pretrained model and its weights""" + +from abc import ABC, abstractmethod +from typing import Tuple, Dict +from numpy.typing import NDArray + + +class PretrainedModelRoutines(ABC): + """Set of routines that are required in order to use a pretrained model for nst.""" + + @abstractmethod + def load_layers(self, file_path: str) -> NDArray: + """Load a pretrained model from disk. + + Loads the model parameters, given the path to a file in the disk, that + indicated where the pretrained model is. + + Args: + file_path (str): the path corresponding to a file in the disk + + Returns: + NDArray: the model parameters as a numpy array + """ + raise NotImplementedError + + @abstractmethod + def get_id(self, layer: NDArray) -> str: + """Get the id of a model's network layer. + + The pretrained model being a neural network has a specific architecture + and each layer should a unique string id that one can reference it. + + Args: + layer (NDArray): the layer of a pretrained neural network model + + Returns: + str: the layer id + """ + raise NotImplementedError + + @abstractmethod + def get_layers_dict(self, layers: NDArray) -> Dict[str, NDArray]: + """Get a dict mapping strings to pretrained model layers. + + Args: + layers (NDArray): the pretrained model layers + + Returns: + Dict[str, NDArray]: the dictionary mapping strings to layers + """ + raise NotImplementedError + + @abstractmethod + def get_weights(self, layer: NDArray) -> Tuple[NDArray, NDArray]: + """Get the values of the weights of a given network layer. + + Each pretrained model network layer has "learned" certain parameters in + the form of "weights" (ie weight matrices A and b in equation Ax + b). + + Call this method to get a tuple of the A and b mathematical matrices. + + Args: + layer (NDArray): the layer of a pretrained neural network model + + Returns: + Tuple[NDArray, NDArray]: the weights in matrix A and b + """ + raise NotImplementedError diff --git a/src/artificial_artwork/pretrained_model/vgg_architecture.py b/src/artificial_artwork/pretrained_model/vgg_architecture.py deleted file mode 100644 index b829c64..0000000 --- a/src/artificial_artwork/pretrained_model/vgg_architecture.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -""" -def vgg_layers(): - """The network's layer structure of the vgg image model.""" - return ( - (0, 'conv1_1') , # (3, 3, 3, 64) - (1, 'relu1_1') , - (2, 'conv1_2') , # (3, 3, 64, 64) - (3, 'relu1_2') , - (4, 'pool1') , - (5, 'conv2_1') , # (3, 3, 64, 128) - (6, 'relu2_1') , - (7, 'conv2_2') , # (3, 3, 128, 128) - (8, 'relu2_2') , - (9, 'pool2') , - (10, 'conv3_1'), # (3, 3, 128, 256) - (11, 'relu3_1'), - (12, 'conv3_2'), # (3, 3, 256, 256) - (13, 'relu3_2'), - (14, 'conv3_3'), # (3, 3, 256, 256) - (15, 'relu3_3'), - (16, 'conv3_4'), # (3, 3, 256, 256) - (17, 'relu3_4'), - (18, 'pool3') , - (19, 'conv4_1'), # (3, 3, 256, 512) - (20, 'relu4_1'), - (21, 'conv4_2'), # (3, 3, 512, 512) - (22, 'relu4_2'), - (23, 'conv4_3'), # (3, 3, 512, 512) - (24, 'relu4_3'), - (25, 'conv4_4'), # (3, 3, 512, 512) - (26, 'relu4_4'), - (27, 'pool4') , - (28, 'conv5_1'), # (3, 3, 512, 512) - (29, 'relu5_1'), - (30, 'conv5_2'), # (3, 3, 512, 512) - (31, 'relu5_2'), - (32, 'conv5_3'), # (3, 3, 512, 512) - (33, 'relu5_3'), - (34, 'conv5_4'), # (3, 3, 512, 512) -# 35 is relu -# 36 is maxpool -# 37 is fullyconnected (7, 7, 512, 4096) -# 38 is relu -# 39 is fullyconnected (1, 1, 4096, 4096) -# 40 is relu -# 41 is fullyconnected (1, 1, 4096, 1000) -# 42 is softmax - ) - - -LAYERS = tuple((layer_id for _, layer_id in vgg_layers())) diff --git a/src/artificial_artwork/production_networks/__init__.py b/src/artificial_artwork/production_networks/__init__.py new file mode 100644 index 0000000..cd690dd --- /dev/null +++ b/src/artificial_artwork/production_networks/__init__.py @@ -0,0 +1,32 @@ +from typing import Iterable, Tuple +import attr + +from .style_layer_selector import NSTLayersSelection + +# just to be compatible with current mypy +def style_layers(tuples: Iterable[Tuple[str, float]]): + return NSTLayersSelection.from_tuples(tuples) + + +@attr.s +class NetworkDesign: + network_layers: Tuple[str] = attr.ib() + style_layers: NSTLayersSelection = attr.ib(converter=style_layers) + output_layer: str = attr.ib() + + @classmethod + def from_default_vgg(cls): + from .image_model import LAYERS + + STYLE_LAYERS = ( + ('conv1_1', 0.2), + ('conv2_1', 0.2), + ('conv3_1', 0.2), + ('conv4_1', 0.2), + ('conv5_1', 0.2), + ) + return NetworkDesign( + LAYERS, + STYLE_LAYERS, + 'conv4_2', + ) diff --git a/src/artificial_artwork/pretrained_model/image_model.py b/src/artificial_artwork/production_networks/image_model.py similarity index 100% rename from src/artificial_artwork/pretrained_model/image_model.py rename to src/artificial_artwork/production_networks/image_model.py diff --git a/src/artificial_artwork/style_layer_selector.py b/src/artificial_artwork/production_networks/style_layer_selector.py similarity index 79% rename from src/artificial_artwork/style_layer_selector.py rename to src/artificial_artwork/production_networks/style_layer_selector.py index f2cbc13..c979cf0 100644 --- a/src/artificial_artwork/style_layer_selector.py +++ b/src/artificial_artwork/production_networks/style_layer_selector.py @@ -1,5 +1,5 @@ +from typing import List, Tuple, Iterable, Protocol import attr -from typing import List, Tuple, Iterable @attr.s @@ -7,8 +7,8 @@ class NSTStyleLayer: id: str = attr.ib() coefficient: float = attr.ib() @coefficient.validator - def validate_wight(self, attribute, value): - if value <= 0 or 1 <= value: + def validate_weight(self, attribute, value): + if value <= 0 or 1 < value: raise ValueError(f'Coefficient must be a number between 0 and 1. Given {value}') neurons = attr.ib(default=None) @@ -19,18 +19,22 @@ def validate_layers(layers): raise ValueError(f'Duplicate layers found in the selection: [{", ".join(layer_ids)}]') +class StyleLayerSelectionProtocol(Protocol): + layers: List[NSTStyleLayer] + @attr.s class NSTLayersSelection: _layers: List[NSTStyleLayer] = attr.ib(validator=lambda self, attribute, layers: validate_layers(layers)) @classmethod - def from_tuples(cls, layers: List[Tuple[str, float]]): + def from_tuples(cls, layers: Iterable[Tuple[str, float]]) -> StyleLayerSelectionProtocol: return NSTLayersSelection([NSTStyleLayer(*layer) for layer in layers]) + # return NSTLayersSelection([NSTStyleLayer(layer[0], layer[1]) for layer in layers]) @property def layers(self) -> List[NSTStyleLayer]: return self._layers - + @layers.setter def layers(self, layers) -> None: """Set the Style Layers selection. @@ -50,13 +54,3 @@ def __getitem__(self, index) -> NSTStyleLayer: def __iter__(self) -> Iterable[Tuple[str, NSTStyleLayer]]: return iter(((layer.id, layer) for layer in self._layers)) - - -# Production Style Layers selection - - # ('conv1_1', 0.2), - # ('conv2_1', 0.2), - # ('conv3_1', 0.2), - # ('conv4_1', 0.2), - # ('conv5_1', 0.2) - # ] diff --git a/src/artificial_artwork/style_model/__init__.py b/src/artificial_artwork/style_model/__init__.py new file mode 100644 index 0000000..8d20377 --- /dev/null +++ b/src/artificial_artwork/style_model/__init__.py @@ -0,0 +1,3 @@ +from .graph_factory import GraphFactory as graph_factory + +__all__ = ['graph_factory'] diff --git a/src/artificial_artwork/style_model/graph_builder.py b/src/artificial_artwork/style_model/graph_builder.py new file mode 100644 index 0000000..fe2f142 --- /dev/null +++ b/src/artificial_artwork/style_model/graph_builder.py @@ -0,0 +1,41 @@ +from typing import Tuple +import numpy as np +from numpy.typing import NDArray +import tensorflow as tf + + +class GraphBuilder: + + def __init__(self): + self.graph = {} + self._prev_layer = None + + def _build_layer(self, layer_id: str, layer): + self.graph[layer_id] = layer + self._prev_layer = layer + return self + + def input(self, image_specs): + self.graph = {} + return self._build_layer('input', tf.Variable(np.zeros( + (1, image_specs.height, image_specs.width, image_specs.color_channels)), dtype='float32')) + + def avg_pool(self, layer_id: str): + return self._build_layer(layer_id, + tf.nn.avg_pool(self._prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')) + + def relu_conv_2d(self, layer_id: str, layer_weights: Tuple[NDArray, NDArray]): + """A Relu wrapped around a convolutional layer. + + Will use the layer_id to find weight (for W and b matrices) values in + the pretrained model (layer). + + Also uses the layer_id to as dict key to the output graph. + """ + W, b = layer_weights + return self._build_layer(layer_id, tf.nn.relu(self._conv_2d(W, b))) + + def _conv_2d(self, W: NDArray, b: NDArray): + W = tf.constant(W) + b = tf.constant(np.reshape(b, (b.size))) + return tf.compat.v1.nn.conv2d(self._prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b diff --git a/src/artificial_artwork/style_model/graph_factory.py b/src/artificial_artwork/style_model/graph_factory.py new file mode 100644 index 0000000..559b31d --- /dev/null +++ b/src/artificial_artwork/style_model/graph_factory.py @@ -0,0 +1,77 @@ +import re +from typing import Dict, Protocol, Any, Iterable +import attr +from numpy.typing import NDArray + +from .graph_builder import GraphBuilder + + +ModelParameters = Dict[str, NDArray] + +class ImageSpecs(Protocol): + width: int + height: int + color_channels: int + + +class GraphFactory: + builder = GraphBuilder() + + @classmethod + def create(cls, config: ImageSpecs, model_design) -> Dict[str, Any]: + """Create a model for the purpose of 'painting'/generating a picture. + + Creates a Deep Learning Neural Network with most layers having weights + (aka model parameters) with values extracted from a pre-trained model + (ie another neural network trained on an image dataset suitably). + + Args: + config ([type]): [description] + model_parameters ([type], optional): [description]. Defaults to None. + + Returns: + Dict[str, Any]: [description] + """ + # each relu_conv_2d uses pretrained model's layer weights for W and b matrices + # each average pooling layer uses custom weight values + # all weights are guaranteed to remain constant (see GraphBuilder._conv_2d method) + + cls.builder.input(config) + LayerMaker( + cls.builder, + model_design.pretrained_model.reporter, + ).make_layers(model_design.network_design.network_layers) + + return cls.builder.graph + + +@attr.s +class LayerMaker: + graph_builder = attr.ib() + reporter = attr.ib() + + layer_callbacks = attr.ib(init=False, default=attr.Factory(lambda self: { + 'conv': self.relu, + 'avgpool': self.graph_builder.avg_pool + }, takes_self=True) + ) + regex = attr.ib(init=False, default=re.compile(r'(\w+?)[\d_]*$')) + + def relu(self, layer_id: str): + return self.graph_builder.relu_conv_2d(layer_id, self.reporter.get_weights(layer_id)) + + def layer(self, layer_id: str): + match_instance = self.regex.match(layer_id) + if match_instance is not None: + return self.layer_callbacks[match_instance.group(1)](layer_id) + raise UnknownLayerError( + f"Failed to construct layer '{layer_id}'. Supported layers are " + f"[{', '.join((k for k in self.layer_callbacks))}] and regex" + f"used to parse the layer is '{self.regex.pattern}'") + + def make_layers(self, layers: Iterable[str]): + for layer_id in layers: + self.layer(layer_id) + + +class UnknownLayerError(Exception): pass diff --git a/src/artificial_artwork/styling_observer.py b/src/artificial_artwork/styling_observer.py index cc2fca5..2659364 100644 --- a/src/artificial_artwork/styling_observer.py +++ b/src/artificial_artwork/styling_observer.py @@ -1,9 +1,10 @@ -from attr import define -from .utils.notification import Observer +import os from typing import Callable -import numpy.typing as npt +from attr import define import numpy as np -import os +import numpy.typing as npt + +from .utils import Observer @define @@ -22,9 +23,10 @@ def update(self, *args, **kwargs): iterations_completed = args[0].state.metrics['iterations'] matrix = args[0].state.matrix - # Impelement handling of the request to persist with a chain of responsibility design pattern - # it suit since we do not knw how many checks and/or image transformation will be required before - # saving on disk + # Future work: Impelement handling of the "request to persist" with a + # chain of responsibility design pattern. It suits this case since we + # do not know how many checks and/or image transformation will be + # required before saving on disk output_file_path = os.path.join( output_dir, @@ -37,27 +39,11 @@ def update(self, *args, **kwargs): if str(matrix.dtype) != 'uint8': matrix = self.convert_to_unit8(matrix) - self.save_on_disk_callback(matrix, output_file_path, format='png') - - # bit_2_data_type = {8: np.uint8} - - # def _convert_to_uint8(self, im): - # bitdepth = 8 - # out_type = type(self).bit_2_data_type[bitdepth] - # mi = np.nanmin(im) - # ma = np.nanmax(im) - # if not np.isfinite(mi): - # raise ValueError("Minimum image value is not finite") - # if not np.isfinite(ma): - # raise ValueError("Maximum image value is not finite") - # if ma == mi: - # return im.astype(out_type) - - # # Make float copy before we scale - # im = im.astype("float64") - # # Scale the values between 0 and 1 then multiply by the max value - # im = (im - mi) / (ma - mi) * (np.power(2.0, bitdepth) - 1) + 0.499999999 - # assert np.nanmin(im) >= 0 - # assert np.nanmax(im) < np.power(2.0, bitdepth) - # im = im.astype(out_type) - # return im + if np.nanmin(matrix) < 0: + raise ImageDataValueError('Generated Image has pixel(s) with negative values.') + if np.nanmax(matrix) >= np.power(2.0, 8): + raise ImageDataValueError('Generated Image has pixel(s) with value >= 255.') + self.save_on_disk_callback(matrix, output_file_path, save_format='png') + + +class ImageDataValueError(Exception): pass diff --git a/src/artificial_artwork/termination_condition/__init__.py b/src/artificial_artwork/termination_condition/__init__.py index e69de29..5ee96c7 100644 --- a/src/artificial_artwork/termination_condition/__init__.py +++ b/src/artificial_artwork/termination_condition/__init__.py @@ -0,0 +1,3 @@ +from .termination_condition import TerminationConditionFacility + +__all__ = ['TerminationConditionFacility'] diff --git a/src/artificial_artwork/termination_condition/termination_condition.py b/src/artificial_artwork/termination_condition/termination_condition.py index 88d37a7..1a42b48 100644 --- a/src/artificial_artwork/termination_condition/termination_condition.py +++ b/src/artificial_artwork/termination_condition/termination_condition.py @@ -1,16 +1,18 @@ import attr -from artificial_artwork.utils.subclass_registry import SubclassRegistry +from artificial_artwork.utils import SubclassRegistry from .termination_condition_interface import TerminationConditionInterface -# TODO: learn how to use the Abstract class implementing a generic interface -# and then inherit from the Abstract class -# T = TypeVar('T') +# Future work: use an Abstract class while also inheriting from a generic +# interface +# class AbstractTerminationCondition(TerminationConditionInterface, Generic[T]) -# class AbstractTerminationCondition(TerminationConditionInterface, Generic[T]): pass + +class TerminationFactoryMeta(SubclassRegistry[TerminationConditionInterface]): + pass -class TerminationFactory(metaclass=SubclassRegistry): +class TerminationFactory(metaclass=TerminationFactoryMeta): pass diff --git a/src/artificial_artwork/termination_condition_adapter.py b/src/artificial_artwork/termination_condition_adapter.py index 5372427..848d37c 100644 --- a/src/artificial_artwork/termination_condition_adapter.py +++ b/src/artificial_artwork/termination_condition_adapter.py @@ -1,84 +1,65 @@ from abc import ABC from typing import Callable, Protocol, Dict, Any from types import MethodType -from .termination_condition.termination_condition_interface import TerminationConditionInterface -from .utils.memoize import ObjectsPool +from .utils import ObjectsPool class MetricsCapable(Protocol): metrics: Dict[str, Any] -__all__ = ['TerminationConditionAdapterFactory'] + +class TerminationConditionProtocol(Protocol): + def satisfied(self, progress: Any) -> bool: ... class AbstractTerminationConditionAdapter(ABC): - termination_condition: TerminationConditionInterface + termination_condition: TerminationConditionProtocol update: Callable[[MetricsCapable], None] + runtime_state: Any + mapping: Dict[str, Dict] def __new__(cls, termination_condition, *args, **kwargs): instance = super().__new__(cls, *args, **kwargs) instance.termination_condition = termination_condition - instance.update = 1 - # instance.update = MethodType(cls._update_callback(cls.adapter_type), instance.update) instance.runtime_state = cls._initial_state_callback(cls.adapter_type)() + instance.update = MethodType(type(instance)._update_callback(type(instance).adapter_type), instance) return instance - def __init__(self, *args, **kwargs): - # TODO move all code in __new__ - self.update = MethodType(type(self)._update_callback(type(self).adapter_type), self) - # setattr(self, attribute.name, types.MethodType(method, self)) - @classmethod - def _update_callback(cls, type: str): + def _update_callback(cls, termination_type: str): def update(self, *args, **kwargs) -> None: - self.runtime_state = args[0].state.metrics[cls.mapping[type]['key_name']] + self.runtime_state = args[0].state.metrics[cls.mapping[termination_type]['key_name']] return update @classmethod - def _initial_state_callback(cls, type: str): + def _initial_state_callback(cls, termination_type: str): def get_initial_state(): - return cls.mapping[type]['state'] + return cls.mapping[termination_type]['state'] return get_initial_state @property def satisfied(self): return self.termination_condition.satisfied(self.runtime_state) - -# Define Metaclass -class TerminationConditionAdapterType(type): - - def __new__(mcs, *args, **kwargs): - # termination_condition_adapter_class = super().__new__(mcs, 'TerminationConditionAdapter', (AbstractTerminationConditionAdapter,), {}) - termination_condition_adapter_class = type('TerminationConditionAdapter', (AbstractTerminationConditionAdapter,), {}) - termination_condition_adapter_class.adapter_type = args[0] - - # The (outer) keys are usable by client code to select termination condition - # Each (inner) 'key_name' points to the name to use to query the subject dict - # Each 'state' key points to a value that should be used to initialize - # the 'runtime_state' attribute [per termination condition (adapter)] - termination_condition_adapter_class.mapping = { +class BaseTerminationConditionAdapter: pass + + +def new_class(adapter_type: str): + return type('TerminationConditionAdapterC', (AbstractTerminationConditionAdapter,), { + 'adapter_type': adapter_type, + 'mapping': { 'max-iterations': {'key_name': 'iterations', 'state': 0}, 'convergence': {'key_name': 'cost', 'state': float('inf')}, 'time-limit': {'key_name': 'duration', 'state': 0}, } - return termination_condition_adapter_class + }) - # Investigate usage of __init__ to verify the above behaviour can be replicated with __init__ class TerminationConditionAdapterClassFactory: """Acts as a proxy to the the 'class maker' function by returning a memoized class.""" - classes_pool = ObjectsPool.new_empty(TerminationConditionAdapterType) + classes_pool = ObjectsPool.new_empty(new_class) @classmethod def create(cls, adapter_type: str): return cls.classes_pool.get_object(adapter_type) - - -class TerminationConditionAdapterFactory: - - @classmethod - def create(cls, adapter_type: str, *args, **kwargs): - dynamic_class = TerminationConditionAdapterClassFactory.create(adapter_type) - return dynamic_class(*args, **kwargs) diff --git a/src/artificial_artwork/termination_condition_adapter_factory.py b/src/artificial_artwork/termination_condition_adapter_factory.py new file mode 100644 index 0000000..1842497 --- /dev/null +++ b/src/artificial_artwork/termination_condition_adapter_factory.py @@ -0,0 +1,11 @@ +from .termination_condition_adapter import TerminationConditionAdapterClassFactory +from .termination_condition import TerminationConditionFacility + + +class TerminationConditionAdapterFactory: + + @classmethod + def create(cls, adapter_type: str, *args): + dynamic_class = TerminationConditionAdapterClassFactory.create(adapter_type) + termination_condition = TerminationConditionFacility.create(adapter_type, *args) + return dynamic_class(termination_condition) diff --git a/src/artificial_artwork/tf_session_runner.py b/src/artificial_artwork/tf_session_runner.py index 2776d55..0a4adf7 100644 --- a/src/artificial_artwork/tf_session_runner.py +++ b/src/artificial_artwork/tf_session_runner.py @@ -2,7 +2,7 @@ from typing import List import tensorflow as tf -from .utils.proxy import RealSubject, Proxy +from .utils import RealSubject, Proxy class TensorflowSessionRunnerSubject(RealSubject): @@ -19,12 +19,15 @@ def __init__(self, real_subject) -> None: self.args_history: List[str] = [] def request(self, *args, **kwargs): - self.args_history.append(f"ARGS: [{', '.join((str(_) for _ in args))}], KWARGS: [{', '.join((f'{k}={v}' for k, v in kwargs.items()))}]") + args_str = f"[{', '.join((str(_) for _ in args))}]" + kwargs_str = f"[{', '.join((f'{k}={v}' for k, v in kwargs.items()))}]" + self.args_history.append(f"ARGS: {args_str}, KWARGS: {kwargs_str}") try: # We know that Proxy executes request by executing the request method on the subject return super().request(*args, **kwargs) - except Exception as e: - raise e + except Exception as tensorflow_error: + raise TensorflowSessionRunError('Tensorflow error occured, when' + f'running session with input args {args_str} and kwargs {kwargs_str}') from tensorflow_error @property def session(self): @@ -39,3 +42,6 @@ def with_default_graph_reset(cls): tf.compat.v1.disable_eager_execution() return TensorflowSessionRunner(TensorflowSessionRunnerSubject( tf.compat.v1.InteractiveSession())) + + +class TensorflowSessionRunError(Exception): pass diff --git a/src/artificial_artwork/utils/__init__.py b/src/artificial_artwork/utils/__init__.py index e69de29..82c1d7f 100644 --- a/src/artificial_artwork/utils/__init__.py +++ b/src/artificial_artwork/utils/__init__.py @@ -0,0 +1,8 @@ +from .notification import Observer, Subject +from .memoize import ObjectsPool +from .proxy import RealSubject, Proxy +from .subclass_registry import SubclassRegistry + + +__all__ = ['Observer', 'Subject', 'ObjectsPool', 'SubclassRegistry', + 'RealSubject', 'Proxy'] diff --git a/src/artificial_artwork/utils/proxy.py b/src/artificial_artwork/utils/proxy.py index 360871c..19e9fce 100644 --- a/src/artificial_artwork/utils/proxy.py +++ b/src/artificial_artwork/utils/proxy.py @@ -28,7 +28,7 @@ def request(self, *args, **kwargs) -> None: raise NotImplementedError -# TODO use generic typers to define RealSubject at runtime +# future work use generic types to define RealSubject at runtime # then the client code will not have to overide the RealSubject class Proxy(Subject): """ diff --git a/src/artificial_artwork/utils/subclass_registry.py b/src/artificial_artwork/utils/subclass_registry.py index 9fd38e7..6a38042 100644 --- a/src/artificial_artwork/utils/subclass_registry.py +++ b/src/artificial_artwork/utils/subclass_registry.py @@ -1,8 +1,13 @@ """Exposes the SubclassRegistry that allows to define a single registration point of one or more subclasses of a (common parent) class.""" +from typing import TypeVar, Generic, Dict -class SubclassRegistry(type): +T = TypeVar('T') + + +class SubclassRegistry(type, Generic[T]): + subclasses: Dict[str, type] """Subclass Registry A (parent) class using this class as metaclass gains the 'subclasses' class attribute as well as the 'create' and @@ -53,7 +58,7 @@ def __new__(mcs, *args, **kwargs): class_object.subclasses = {} return class_object - def create(cls, subclass_identifier, *args, **kwargs): + def create(cls, subclass_identifier, *args, **kwargs) -> T: """Create an instance of a registered subclass, given its unique identifier and runtime (constructor) arguments. Invokes the identified subclass constructor passing any supplied arguments. The user needs to know the arguments diff --git a/src/stubs/imageio.pyi b/src/stubs/imageio.pyi index 887d11f..69f3149 100644 --- a/src/stubs/imageio.pyi +++ b/src/stubs/imageio.pyi @@ -4,9 +4,7 @@ from typing import Union ImSaveFormatParamType = Union[str, None] -# Function bodies cannot be completely removed. By convention, -# we replace them with `...` instead of the `pass` statement. def imread(file_path: str) -> npt.NDArray: ... -# We can do the same with default arguments. + def imsave(file_path: str, image: npt.NDArray, format: ImSaveFormatParamType) -> None: ... diff --git a/src/stubs/scipy/__init__.pyi b/src/stubs/scipy/__init__.pyi new file mode 100644 index 0000000..e69de29 diff --git a/src/stubs/scipy/io.pyi b/src/stubs/scipy/io.pyi new file mode 100644 index 0000000..e4b8095 --- /dev/null +++ b/src/stubs/scipy/io.pyi @@ -0,0 +1,6 @@ + +import numpy.typing as npt +from typing import Dict + + +def loadmat(file_path: str) -> Dict[str, npt.NDArray]: ... diff --git a/src/stubs/tensorflow/__init__.pyi b/src/stubs/tensorflow/__init__.pyi new file mode 100644 index 0000000..6171acf --- /dev/null +++ b/src/stubs/tensorflow/__init__.pyi @@ -0,0 +1,41 @@ +from typing import TypeVar, Protocol, Union, List + + +T = TypeVar('T') + +# future work: change T to indicate either numpy arrays or tensorflow tensors +def matmul(volume_1: T, volume_2: T) -> T: ... + +Tensor = TypeVar('Tensor') + +def transpose(volume_1: Tensor) -> Tensor: ... + + +def constant(tensor: Tensor) -> Tensor: ... + + +ksize_type = Union[int, List[int]] + + +class nn(Protocol): + @staticmethod + def avg_pool(layer: Tensor, ksize: ksize_type, strides: ksize_type, padding: str) -> Tensor: ... + + @staticmethod + def relu(layer: Tensor) -> Tensor: ... + + @staticmethod + def conv2d(layer: Tensor, filter: Tensor, strides: ksize_type, padding: str) -> Tensor: ... + + +class TrainAPI(Protocol): + @staticmethod + def AdamOptimizer(learning_rate: float): pass + +class TensorflowV1Api(Protocol): + nn: nn + train: TrainAPI + + +class compat(Protocol): + v1: TensorflowV1Api diff --git a/tests/conftest.py b/tests/conftest.py index 74f8aaa..a954ccb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ import pytest +from artificial_artwork.pretrained_model import model_handler + @pytest.fixture def test_suite(): @@ -53,12 +55,6 @@ def __exit__(self, type, value, traceback): return MySession -# @pytest.fixture -# def default_image_processing_config(): -# from artificial_artwork.image import ImageProcessingConfig -# return ImageProcessingConfig.from_image_dimensions() - - @pytest.fixture def image_factory(): """Production Image Factory. @@ -127,3 +123,163 @@ def iterate(self): return i return TestSubject + + +@pytest.fixture +def toy_model_data(): + import numpy as np + from artificial_artwork.pretrained_model import ModelHandlerFacility + from artificial_artwork.pre_trained_models.vgg import VggModelRoutines, VggModelHandler + + from functools import reduce + model_layers = ( + 'conv1_1', + 'relu1', + 'maxpool1', + ) + convo_w_weights_shape = (3, 3, 3, 4) + + class ToyModelRoutines(VggModelRoutines): + + def load_layers(self, file_path: str): + return { + 'layers': [[ + [[[[model_layers[0]], 'unused', [[ + np.reshape(np.array([i for i in range(1, reduce(lambda i,j: i*j, convo_w_weights_shape)+1)], dtype=np.float32), convo_w_weights_shape), + np.array([5], dtype=np.float32) + ]]]]], + [[[[model_layers[1]], 'unused', [['W', 'b']]]]], + [[[[model_layers[2]], 'unused', [['W', 'b']]]]], + ]] + } + + + toy_model_routines = ToyModelRoutines() + + @ModelHandlerFacility.factory.register_as_subclass('toy') + class ToyModelHandler(VggModelHandler): + def _load_model_layers(self): + return toy_model_routines.load_layers('')['layers'][0] + + @property + def model_routines(self): + return toy_model_routines + + return type('TMD', (), { + 'expected_layers': model_layers, + }) + + +@pytest.fixture +def toy_network_design(): + # layers we pick to use for our Neural Network + network_layers = ('conv1_1',) + weight = 1.0 / len(network_layers) + style_layers = [(layer_id, weight) for layer_id in network_layers] + return type('ModelDesign', (), { + 'network_layers': ( + 'conv1_1', + ), + 'style_layers': style_layers, + 'output_layer': 'conv1_1', + }) + + +@pytest.fixture +def image_manager_class(): + from artificial_artwork.nst_image import ImageManager + return ImageManager + + +## Supported pretrained models and their expected layers + +@pytest.fixture +def vgg_layers(): + """The vgg image model network's layer structure.""" + VGG_LAYERS = ( + (0, 'conv1_1') , # (3, 3, 3, 64) + (1, 'relu1_1') , + (2, 'conv1_2') , # (3, 3, 64, 64) + (3, 'relu1_2') , + (4, 'pool1') , # maxpool + (5, 'conv2_1') , # (3, 3, 64, 128) + (6, 'relu2_1') , + (7, 'conv2_2') , # (3, 3, 128, 128) + (8, 'relu2_2') , + (9, 'pool2') , + (10, 'conv3_1'), # (3, 3, 128, 256) + (11, 'relu3_1'), + (12, 'conv3_2'), # (3, 3, 256, 256) + (13, 'relu3_2'), + (14, 'conv3_3'), # (3, 3, 256, 256) + (15, 'relu3_3'), + (16, 'conv3_4'), # (3, 3, 256, 256) + (17, 'relu3_4'), + (18, 'pool3') , + (19, 'conv4_1'), # (3, 3, 256, 512) + (20, 'relu4_1'), + (21, 'conv4_2'), # (3, 3, 512, 512) + (22, 'relu4_2'), + (23, 'conv4_3'), # (3, 3, 512, 512) + (24, 'relu4_3'), + (25, 'conv4_4'), # (3, 3, 512, 512) + (26, 'relu4_4'), + (27, 'pool4') , + (28, 'conv5_1'), # (3, 3, 512, 512) + (29, 'relu5_1'), + (30, 'conv5_2'), # (3, 3, 512, 512) + (31, 'relu5_2'), + (32, 'conv5_3'), # (3, 3, 512, 512) + (33, 'relu5_3'), + (34, 'conv5_4'), # (3, 3, 512, 512) + (35, 'relu5_4'), + (36, 'pool5'), + (37, 'fc6'), # fullyconnected (7, 7, 512, 4096) + (38, 'relu6'), + (39, 'fc7'), # fullyconnected (1, 1, 4096, 4096) + (40, 'relu7'), + (41, 'fc8'), # fullyconnected (1, 1, 4096, 1000) + (42, 'prob'), # softmax + ) + + return tuple((layer_id for _, layer_id in VGG_LAYERS)) + + +import os +PRODUCTION_IMAGE_MODEL = os.environ.get('AA_VGG_19', 'PRETRAINED_MODEL_NOT_FOUND') + + +@pytest.fixture +def pre_trained_models_1(vgg_layers, toy_model_data, toy_network_design): + from artificial_artwork.production_networks import NetworkDesign + from artificial_artwork.pretrained_model import ModelHandlerFacility + return { + 'vgg': type('NSTModel', (), { + 'pretrained_model': type('PTM', (), { + 'expected_layers': vgg_layers, + 'id': 'vgg', + 'handler': ModelHandlerFacility.create('vgg'), + }), + 'network_design': NetworkDesign.from_default_vgg() + }), + 'toy': type('NSTModel', (), { + 'pretrained_model': type('PTM', (), { + 'expected_layers': toy_model_data.expected_layers, + 'id': 'toy', + 'handler': ModelHandlerFacility.create('toy'), + }), + 'network_design': NetworkDesign( + toy_network_design.network_layers, + toy_network_design.style_layers, + toy_network_design.output_layer, + ) + }), + } + +@pytest.fixture +def model(pre_trained_models_1): + import os + return { + True: pre_trained_models_1['vgg'], + False: pre_trained_models_1['toy'], + }[os.path.isfile(PRODUCTION_IMAGE_MODEL)] diff --git a/tests/data/blue-red_w300-h225.jpg b/tests/data/blue-red_w300-h225.jpg new file mode 100644 index 0000000..cc9fa41 Binary files /dev/null and b/tests/data/blue-red_w300-h225.jpg differ diff --git a/tests/data/canoe_water_w300-h225.jpg b/tests/data/canoe_water_w300-h225.jpg new file mode 100644 index 0000000..36571f7 Binary files /dev/null and b/tests/data/canoe_water_w300-h225.jpg differ diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py new file mode 100644 index 0000000..ecdc72f --- /dev/null +++ b/tests/test_algorithm.py @@ -0,0 +1,143 @@ +import pytest + +from click.testing import CliRunner +from artificial_artwork.cli import cli +from unittest.mock import patch + + +runner = CliRunner() + +@pytest.fixture +def image_file_names(): + return type('Images', (), { + 'content': 'canoe_water_w300-h225.jpg', + 'style': 'blue-red_w300-h225.jpg' + }) + + +@pytest.fixture +def image_manager(image_manager_class): + """Production ImageManager instance.""" + import numpy as np + from artificial_artwork.image.image_operations import reshape_image, subtract + means = np.array([123.68, 116.779, 103.939]).reshape((1,1,1,3)) + return image_manager_class([ + lambda matrix: reshape_image(matrix, ((1,) + matrix.shape)), + lambda matrix: subtract(matrix, means), # input image must have 3 channels! + ]) + + +@pytest.fixture +def max_iterations_adapter_factory_method(): + from artificial_artwork.termination_condition_adapter_factory import TerminationConditionAdapterFactory + def create_max_iterations_termination_condition_adapter(iterations): + return TerminationConditionAdapterFactory.create('max-iterations', iterations) + return create_max_iterations_termination_condition_adapter + + +@pytest.fixture +def algorithm_parameters_class(): + from artificial_artwork.algorithm import AlogirthmParameters + return AlogirthmParameters + + +@pytest.fixture +def algorithm(algorithm_parameters_class): + from artificial_artwork.algorithm import NSTAlgorithm + def _create_algorithm(*parameters): + return NSTAlgorithm(algorithm_parameters_class(*parameters)) + return _create_algorithm + + +@pytest.fixture +def create_algorithm(algorithm, tmpdir): + def _create_algorithm(image_manager, termination_condition_adapter): + return algorithm( + image_manager.content_image, + image_manager.style_image, + termination_condition_adapter, + tmpdir + ) + return _create_algorithm + + +@pytest.fixture +def create_production_algorithm_runner(): + from artificial_artwork.nst_tf_algorithm import NSTAlgorithmRunner + from artificial_artwork.image.image_operations import noisy, convert_to_uint8 + from artificial_artwork.styling_observer import StylingObserver + from artificial_artwork.disk_operations import Disk + + noisy_ratio = 0.6 + def _create_production_algorithm_runner(termination_condition_adapter): + algorithm_runner = NSTAlgorithmRunner.default( + lambda matrix: noisy(matrix, noisy_ratio), + ) + + algorithm_runner.progress_subject.add( + termination_condition_adapter, + ) + algorithm_runner.persistance_subject.add( + StylingObserver(Disk.save_image, convert_to_uint8) + ) + return algorithm_runner + return _create_production_algorithm_runner + + +@pytest.fixture +def get_algorithm_runner(create_production_algorithm_runner): + def _get_algorithm_runner(termination_condition_adapter): + algorithm_runner = create_production_algorithm_runner( + termination_condition_adapter, + ) + return algorithm_runner + return _get_algorithm_runner + + +@pytest.fixture +def get_model_design(): + def _get_model_design(handler, network_design): + return type('ModelDesign', (), { + 'pretrained_model': handler, + 'network_design': network_design}) + return _get_model_design + + +def test_nst_runner( + get_algorithm_runner, + create_algorithm, + image_file_names, + get_model_design, + max_iterations_adapter_factory_method, + image_manager, + test_image, + model, + tmpdir): + """Test nst algorithm runner. + + Runs a simple 'smoke test' by iterating only 3 times. + """ + import os + ITERATIONS = 3 + + image_manager.load_from_disk(test_image(image_file_names.content), 'content') + image_manager.load_from_disk(test_image(image_file_names.style), 'style') + + assert image_manager.images_compatible == True + + termination_condition_adapter = max_iterations_adapter_factory_method(ITERATIONS) + + algorithm_runner = get_algorithm_runner(termination_condition_adapter) + + algorithm = create_algorithm(image_manager, termination_condition_adapter) + + model_design = get_model_design( + model.pretrained_model.handler, + model.network_design, + ) + model_design.pretrained_model.load_model_layers() + algorithm_runner.run(algorithm, model_design) + + template_string = image_file_names.content + '+' + image_file_names.style + '-' + '{}' + '.png' + assert os.path.isfile(os.path.join(tmpdir, template_string.format(1))) + assert os.path.isfile(os.path.join(tmpdir, template_string.format(ITERATIONS))) diff --git a/tests/test_algorithm_params.py b/tests/test_algorithm_params.py index e035f55..4356612 100644 --- a/tests/test_algorithm_params.py +++ b/tests/test_algorithm_params.py @@ -6,10 +6,6 @@ def algorithm_parameters(): return AlogirthmParameters( 'content_image', 'style_image', - [ - ('layer-1', 0.5), - ('layer-2', 0.5), - ], 'termination_condition', 'output_path', ) @@ -18,6 +14,5 @@ def algorithm_parameters(): def test_algorithm_parameters(algorithm_parameters): assert hasattr(algorithm_parameters, 'content_image') assert hasattr(algorithm_parameters, 'style_image') - assert hasattr(algorithm_parameters, 'style_layers') assert hasattr(algorithm_parameters, 'termination_condition') assert hasattr(algorithm_parameters, 'output_path') diff --git a/tests/test_cv_model.py b/tests/test_cv_model.py index 179c6cd..706764e 100644 --- a/tests/test_cv_model.py +++ b/tests/test_cv_model.py @@ -1,48 +1,18 @@ import os import pytest -from artificial_artwork.pretrained_model.model_loader import get_vgg_19_model_path, load_default_model_parameters my_dir = os.path.dirname(os.path.realpath(__file__)) -# IMAGE_MODEL_FILE_NAME = 'imagenet-vgg-verydeep-19.mat' - - -PRODUCTION_IMAGE_MODEL = os.environ.get('AA_VGG_19', 'PRETRAINED_MODEL_NOT_FOUND') - - -@pytest.fixture -def model_parameters(): - from artificial_artwork.pretrained_model.model_loader import load_default_model_parameters - return load_default_model_parameters() - # return load_default_model_parameters() - # from artificial_artwork.pretrained_model.model_loader import load_vgg_model_parameters - # return load_vgg_model_parameters - - -@pytest.fixture -def vgg_layers(): - """Expected layers structure of the vgg image model.""" - from artificial_artwork.pretrained_model.vgg_architecture import LAYERS - return LAYERS - - -@pytest.fixture -def style_network_architecture(): - from artificial_artwork.pretrained_model.image_model import LAYERS - return LAYERS - @pytest.fixture def graph_factory(): - from artificial_artwork.pretrained_model import graph_factory + from artificial_artwork.style_model import graph_factory return graph_factory -@pytest.mark.xfail(not os.path.isfile(PRODUCTION_IMAGE_MODEL), - reason="No file found to load the pretrained image (cv) model.") -def test_pretrained_model(model_parameters, graph_factory, vgg_layers, style_network_architecture): - layers = model_parameters['layers'] +def test_pretrained_model(model, graph_factory): + layers = model.pretrained_model.handler.load_model_layers() image_specs = type('ImageSpecs', (), { 'width': 400, @@ -50,67 +20,14 @@ def test_pretrained_model(model_parameters, graph_factory, vgg_layers, style_net 'color_channels': 3 })() - # verify original/loaded neural network has 43 layers - assert len(layers[0]) == 43 - - for i, name in enumerate(vgg_layers): - assert layers[0][i][0][0][0][0] == name - - graph = graph_factory.create(image_specs, model_parameters=model_parameters) - assert set(graph.keys()) == set(['input'] + list(style_network_architecture)) - - -@pytest.fixture -def graph_builder(): - from artificial_artwork.pretrained_model.model_loader import GraphBuilder - return GraphBuilder() - - -def test_building_layers(graph_builder): - import numpy as np - height = 2 - width = 6 - channels = 2 - expected_input_shape = (1, height, width, channels) - - graph_builder.input(width, height, nb_channels=channels) - # assert previous layer is the 'input' layer we just added/created - assert tuple(graph_builder._prev_layer.shape) == expected_input_shape - for w in range(width): - for h in range(height): - for c in range(channels): - assert graph_builder._prev_layer[0][h][w][c] == graph_builder.graph['input'][0][h][w][c] == 0 - - # create relu(convolution) layer - W = np.array(np.random.rand(*expected_input_shape[1:], channels), dtype=np.float32) - - b_weight = 6.0 - b = np.array([b_weight], dtype=np.float32) - graph_builder.relu_conv_2d('convo1', (W, b)) - - # assert the previous layer is the relu(convolution) layer we just added - assert tuple(graph_builder._prev_layer.shape) == expected_input_shape - for w in range(width): - for h in range(height): - for c in range(channels): - assert graph_builder._prev_layer[0][h][w][c] == graph_builder.graph['convo1'][0][h][w][c] - assert graph_builder._prev_layer[0][h][w][c] == b_weight # W[h][w][c][c] + b[0] - - - # create Average Pooling layer - layer_id = 'avgpool1' - graph_builder.avg_pool(layer_id) - - # assert previous layer is the layer we just added/created - expected_avg_pool_shape = (1, 1, 2, channels) - expected_avg_output = np.array( - [[[[b_weight, b_weight, b_weight], - [b_weight, b_weight, b_weight], - [b_weight, b_weight, b_weight] - ]]] - ,dtype=np.float32) - - for i in range(expected_avg_pool_shape[2]): - for c in range(channels): - assert graph_builder._prev_layer[0][0][i][c] == graph_builder.graph[layer_id][0][0][i][c] - assert graph_builder._prev_layer[0][0][i][c] == expected_avg_output[0][0][i][c] + assert len(layers) == len(model.pretrained_model.expected_layers) + for i, name in enumerate(model.pretrained_model.expected_layers): + assert layers[i][0][0][0][0] == name + + model.pretrained_model.handler.reporter = layers + model_design = type('ModelDesign', (), { + 'pretrained_model': model.pretrained_model.handler, + 'network_design': model.network_design + }) + graph = graph_factory.create(image_specs, model_design) + assert set(graph.keys()) == set(['input'] + list(model.network_design.network_layers)) diff --git a/tests/test_disk_ops.py b/tests/test_disk_ops.py index e9057b0..2731a73 100644 --- a/tests/test_disk_ops.py +++ b/tests/test_disk_ops.py @@ -15,7 +15,7 @@ def images_to_save(test_image): def test_save_operation(image_id, expected_size, disk, images_to_save, tmpdir): import os target_file = os.path.join(tmpdir, image_id) - disk.save_image(images_to_save[image_id], target_file, format='png') + disk.save_image(images_to_save[image_id], target_file, save_format='png') # assert actual size of file in disk matches the expected size assert os.path.getsize(target_file) == expected_size diff --git a/tests/test_graph_builder.py b/tests/test_graph_builder.py new file mode 100644 index 0000000..9660394 --- /dev/null +++ b/tests/test_graph_builder.py @@ -0,0 +1,63 @@ +import pytest + + +@pytest.fixture +def graph_builder(): + from artificial_artwork.style_model.graph_builder import GraphBuilder + return GraphBuilder() + + +def test_building_layers(graph_builder): + import tensorflow as tf + tf.compat.v1.reset_default_graph() + tf.compat.v1.enable_eager_execution() + import numpy as np + height = 2 + width = 6 + channels = 2 + expected_input_shape = (1, height, width, channels) + + graph_builder.input(type('ImageSpecs', (), { + 'width': width, + 'height': height, + 'color_channels': channels + })()) + # assert previous layer is the 'input' layer we just added/created + assert tuple(graph_builder._prev_layer.shape) == expected_input_shape + assert (graph_builder._prev_layer.numpy() - graph_builder.graph['input'].numpy()).all() == 0 + assert graph_builder.graph['input'].numpy().all() == 0 + + # create relu(convolution) layer + W = np.array(np.random.rand(*expected_input_shape[1:], channels), dtype=np.float32) + + b_weight = 6.0 + b = np.array([b_weight], dtype=np.float32) + graph_builder.relu_conv_2d('convo1', (W, b)) + + # assert the previous layer is the relu(convolution) layer we just added + assert tuple(graph_builder._prev_layer.shape) == expected_input_shape + assert (graph_builder.graph['convo1'].numpy() - graph_builder._prev_layer.numpy()).all() == 0 + # We expect that the tensor values are equal to the weight because the algorithm initializes input with tf.zeros + assert (graph_builder.graph['convo1'].numpy() - b).all() == 0 + + + # create Average Pooling layer + layer_id = 'avgpool1' + graph_builder.avg_pool(layer_id) + + # assert previous layer is the layer we just added/created + expected_avg_pool_shape = (1, 1, 3, 2) + expected_avg_output = np.array( + [[[[b_weight, b_weight, b_weight], + [b_weight, b_weight, b_weight], + [b_weight, b_weight, b_weight] + ]]] + ,dtype=np.float32) + assert graph_builder.graph[layer_id].numpy().shape == expected_avg_pool_shape + assert (graph_builder.graph[layer_id].numpy() - graph_builder._prev_layer.numpy()).all() == 0 + assert (graph_builder.graph[layer_id].numpy() - np.array([b_weight])).all() == 0 + + for i in range(2): + for c in range(2): + assert graph_builder._prev_layer[0][0][i][c] == graph_builder.graph[layer_id][0][0][i][c] + assert graph_builder._prev_layer[0][0][i][c] == expected_avg_output[0][0][i][c] diff --git a/tests/test_layers_selection.py b/tests/test_layers_selection.py index 7c8b10f..b48295f 100644 --- a/tests/test_layers_selection.py +++ b/tests/test_layers_selection.py @@ -1,6 +1,18 @@ import pytest +@pytest.fixture +def style_layer_class(): + from artificial_artwork.production_networks.style_layer_selector import NSTStyleLayer + return NSTStyleLayer + + +@pytest.fixture +def layers_selection_class(): + from artificial_artwork.production_networks.style_layer_selector import NSTLayersSelection + return NSTLayersSelection + + @pytest.fixture def valid_style_layers(): def _valid_style_layers_selection(nb_layers): @@ -9,25 +21,21 @@ def _valid_style_layers_selection(nb_layers): @pytest.fixture -def valid_nst_layers_list(valid_style_layers): - from artificial_artwork.style_layer_selector import NSTStyleLayer - return [NSTStyleLayer(*layer) for layer in valid_style_layers(5)] +def valid_nst_layers_list(valid_style_layers, style_layer_class): + return [style_layer_class(*layer) for layer in valid_style_layers(5)] @pytest.fixture -def invalid_nst_layers_list(): - from artificial_artwork.style_layer_selector import NSTStyleLayer - return [NSTStyleLayer(*layer) for layer in [ +def invalid_nst_layers_list(style_layer_class): + return [style_layer_class(*layer) for layer in [ ('conv1_1', 0.2), ('conv2_1', 0.5), ('conv3_1', 0.5),]] @pytest.fixture -def layers_selection(valid_style_layers): - from artificial_artwork.style_layer_selector import NSTLayersSelection - return NSTLayersSelection.from_tuples(valid_style_layers(5)) - +def layers_selection(valid_style_layers, layers_selection_class): + return layers_selection_class.from_tuples(valid_style_layers(5)) def test_layers_selection(layers_selection, valid_nst_layers_list, invalid_nst_layers_list): @@ -57,13 +65,11 @@ def invalid_style_layers_list(request): return request.param -def test_invalid_construction(invalid_style_layers_list): - from artificial_artwork.style_layer_selector import NSTLayersSelection +def test_invalid_construction(invalid_style_layers_list, layers_selection_class): with pytest.raises(ValueError): - _ = NSTLayersSelection.from_tuples(invalid_style_layers_list) + _ = layers_selection_class.from_tuples(invalid_style_layers_list) -def test_invalid_layer_coefficient(): - from artificial_artwork.style_layer_selector import NSTStyleLayer +def test_invalid_layer_coefficient(style_layer_class): with pytest.raises(ValueError): - _ = NSTStyleLayer('layer-id', 1.1) + _ = style_layer_class('layer-id', 1.1) diff --git a/tests/test_notification.py b/tests/test_notification.py index 9f16cda..f575a0b 100644 --- a/tests/test_notification.py +++ b/tests/test_notification.py @@ -3,13 +3,13 @@ @pytest.fixture def subject(): - from artificial_artwork.utils.notification import Subject + from artificial_artwork.utils import Subject return Subject @pytest.fixture def observer(): - from artificial_artwork.utils.notification import Observer + from artificial_artwork.utils import Observer return Observer @@ -55,8 +55,7 @@ def some_business_logic(self) -> None: happen (or after it). """ print("\nSubject: I'm doing something important.") - from random import randrange - self._state = randrange(0, 10) + self._state = 2 print(f"Subject: My state has just changed to: {self._state}") self.notify() @@ -69,8 +68,7 @@ def update(self, a_subject) -> None: assert id(s1) != id(s2) assert id(s1._observers) != id(s2._observers) o1, o2 = ObserverA(), ObserverB() - # s2.attach(o1) - # s2.attach(o2) + s2.add(o1, o2) # business logic print(s2._observers) diff --git a/tests/test_nst_image.py b/tests/test_nst_image.py index fbb039d..5895aab 100644 --- a/tests/test_nst_image.py +++ b/tests/test_nst_image.py @@ -2,9 +2,8 @@ @pytest.fixture -def image_manager(): - from artificial_artwork.nst_image import ImageManager - return ImageManager([lambda array: array + 2]) +def toy_image_manager(image_manager_class): + return image_manager_class([lambda array: array + 2]) @pytest.fixture @@ -19,17 +18,17 @@ def incompatible_image(test_image): return test_image('wikipedia-logo.png') -def test_image_manager(image_manager, compatible_images, incompatible_image): - assert image_manager.images_compatible == False +def test_image_manager(toy_image_manager, compatible_images, incompatible_image): + assert toy_image_manager.images_compatible == False - image_manager.load_from_disk(compatible_images.content, 'content') - assert image_manager.images_compatible == False + toy_image_manager.load_from_disk(compatible_images.content, 'content') + assert toy_image_manager.images_compatible == False - image_manager.load_from_disk(compatible_images.style, 'style') - assert image_manager.images_compatible == True + toy_image_manager.load_from_disk(compatible_images.style, 'style') + assert toy_image_manager.images_compatible == True - image_manager.load_from_disk(incompatible_image, 'content') - assert image_manager.images_compatible == False + toy_image_manager.load_from_disk(incompatible_image, 'content') + assert toy_image_manager.images_compatible == False with pytest.raises(ValueError): - image_manager.load_from_disk(compatible_images.content, 'unknown-type') + toy_image_manager.load_from_disk(compatible_images.content, 'unknown-type') diff --git a/tests/test_subclass_registry.py b/tests/test_subclass_registry.py index 279b11a..49e51d1 100644 --- a/tests/test_subclass_registry.py +++ b/tests/test_subclass_registry.py @@ -3,7 +3,8 @@ @pytest.fixture def subclass_registry_module(): - from artificial_artwork.utils.subclass_registry import SubclassRegistry, InstantiationError + from artificial_artwork.utils import SubclassRegistry + from artificial_artwork.utils.subclass_registry import InstantiationError return type('M', (), { 'SubclassRegistry': SubclassRegistry, 'InstantiationError': InstantiationError, diff --git a/tests/test_termination_condition_adapter.py b/tests/test_termination_condition_adapter.py index a511cba..5a1468d 100644 --- a/tests/test_termination_condition_adapter.py +++ b/tests/test_termination_condition_adapter.py @@ -2,9 +2,8 @@ @pytest.fixture def termination_condition_adapter(termination_condition): - from artificial_artwork.termination_condition_adapter import TerminationConditionAdapterFactory - termination_condition_instance = termination_condition('max-iterations', 4) - return TerminationConditionAdapterFactory.create('max-iterations', termination_condition_instance) + from artificial_artwork.termination_condition_adapter_factory import TerminationConditionAdapterFactory + return TerminationConditionAdapterFactory.create('max-iterations', 4) @@ -21,10 +20,8 @@ def test_adapter(test_objects): # subscribe adapter to broadcaster test_objects.broadcaster.subject.attach(test_objects.adapter) - # select a loop function (from test implementations) - # iterate iterations = test_objects.broadcaster.iterate() - # assert iterations completed are the expected nb of oiterations + # assert iterations completed are the expected nb of iterations assert iterations == 4 diff --git a/tests/test_total_cost.py b/tests/test_total_cost.py deleted file mode 100644 index cfc7b44..0000000 --- a/tests/test_total_cost.py +++ /dev/null @@ -1,43 +0,0 @@ -import pytest - -@pytest.fixture -def total_cost(): - from artificial_artwork.cost_computer import NSTCostComputer - return NSTCostComputer.compute - - -@pytest.mark.parametrize('a, x, b, y', [ - (1, 2, 3, 4), - (0, 1, 2, 3), -]) -def test_total_cost_computation(a, x, b, y, total_cost): - assert total_cost(x, y, a, b) == a * x + b * y - - -@pytest.mark.parametrize('seed, a, b', [ - (3, 10, 40), -]) -def test_random_total_cost_computation(seed, a, b, total_cost, session): - import numpy as np - with session(seed) as test: - np.random.seed(seed) - J_content = abs(np.random.randn()) - J_style = abs(np.random.randn()) - J = total_cost(J_content, J_style, alpha=a, beta=b) - assert abs(J - 35.34667875478276) < 1e-6 - - -@pytest.mark.parametrize('seed', [ - (3,), -]) -def test_default_total_cost_computation(seed, total_cost, session): - import numpy as np - - with session(seed) as test: - np.random.seed(seed) - J_content = abs(np.random.randn()) - J_style = abs(np.random.randn()) - J1 = total_cost(J_content, J_style) - J2 = total_cost(J_content, J_style, alpha=10, beta=40) - assert J1 == J2 - assert abs(J1 - 65.36223497201107) < 1e-6 diff --git a/tox.ini b/tox.ini index 7d9bb6f..eafe5bb 100644 --- a/tox.ini +++ b/tox.ini @@ -88,10 +88,12 @@ deps = click numpy imageio + scipy mypy skip_install = true commands = - mypy {posargs} --follow-imports skip --install-types {toxinidir}/src/{env:PY_PACKAGE} + ; mypy --follow-imports skip --install-types {posargs:{toxinidir}/src/} + mypy --install-types {posargs:{toxinidir}/src/} -v ## PYTHON PACKAGING