diff --git a/test/Ne16TestConf.py b/test/Ne16TestConf.py new file mode 100644 index 0000000..889a1fe --- /dev/null +++ b/test/Ne16TestConf.py @@ -0,0 +1,140 @@ +# Luka Macan +# +# Copyright 2023 ETH Zurich and University of Bologna +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations +from typing import List, Union, Optional +from Ne16 import Ne16 +from NnxTestClasses import NnxTestConf +from TestClasses import implies, KernelShape, Padding, Stride, IntegerType +from pydantic import field_validator, model_validator + + +class Ne16TestConf(NnxTestConf): + @field_validator("kernel_shape") + @classmethod + def check_valid_kernel_shape(cls, v: KernelShape) -> KernelShape: + assert v == KernelShape(height=1, width=1) or v == KernelShape( + height=3, width=3 + ), f"Unsupported kernel shape {v}. Supported 1x1 and 3x3." + return v + + @field_validator("stride") + @classmethod + def check_valid_stride(cls, v: Stride) -> Stride: + assert v == Stride(height=1, width=1) or v == Stride( + height=2, width=2 + ), f"Unsupported stride {v}. Supported 1x1 and 2x2." + return v + + @staticmethod + def _check_type( + name: str, _type: IntegerType, allowed_types: List[Union[IntegerType, str]] + ) -> None: + assert ( + _type in allowed_types + ), f"Unsupported {name} type {_type}. Supported types: {allowed_types}" + + @field_validator("in_type") + @classmethod + def check_valid_in_type(cls, v: IntegerType) -> IntegerType: + Ne16TestConf._check_type("in_type", v, ["uint8"]) + return v + + @field_validator("out_type") + @classmethod + def check_valid_out_type(cls, v: IntegerType) -> IntegerType: + Ne16TestConf._check_type("out_type", v, ["uint8", "int8"]) + return v + + @field_validator("weight_type") + @classmethod + def check_valid_weight_type(cls, v: IntegerType) -> IntegerType: + Ne16TestConf._check_type("weight_type", v, ["int8"]) + return v + + @field_validator("scale_type") + @classmethod + def check_valid_scale_type(cls, v: Optional[IntegerType]) -> Optional[IntegerType]: + if v is not None: + Ne16TestConf._check_type("scale_type", v, ["uint8", "uint32"]) + return v + + @field_validator("bias_type") + @classmethod + def check_valid_bias_type(cls, v: Optional[IntegerType]) -> Optional[IntegerType]: + if v is not None: + Ne16TestConf._check_type("bias_type", v, ["int32"]) + return v + + @model_validator(mode="after") # type: ignore + def check_valid_out_channel_with_stride_2x2(self) -> Ne16TestConf: + assert implies( + self.stride == Stride(height=2, width=2), self.out_channel % 2 == 0 + ), f"With stride 2x2 supported only even output channel sizes. Given output channel {self.out_channel}" + return self + + @model_validator(mode="after") # type: ignore + def check_valid_depthwise(self) -> Ne16TestConf: + assert implies( + self.depthwise, self.kernel_shape == KernelShape(height=3, width=3) + ), f"Depthwise supported only on 3x3 kernel shape. Given kernel shape {self.kernel_shape}." + assert implies(self.depthwise, self.in_channel == self.out_channel), ( + f"Input and output channel should be the same in a depthwise layer. " + f"input channel: {self.in_channel}, output channel: {self.out_channel}" + ) + return self + + @model_validator(mode="after") # type: ignore + def check_valid_padding_with_kernel_shape_1x1(self) -> Ne16TestConf: + assert implies( + self.kernel_shape == KernelShape(height=1, width=1), + self.padding == Padding(top=0, bottom=0, left=0, right=0), + ), f"No padding on 1x1 kernel. Given padding {self.padding}" + return self + + @field_validator("has_norm_quant") + @classmethod + def check_valid_has_norm_quant(cls, v: bool) -> bool: + assert v == True, f"Untested without has_norm_quant." + return v + + @model_validator(mode="after") # type: ignore + def check_valid_norm_quant_types_when_has_norm_qunat(self) -> Ne16TestConf: + if self.has_norm_quant: + assert self.scale_type is not None, "Scale type was not provided." + if self.has_bias: + assert self.bias_type is not None, "Bias type was not provided." + return self + + @model_validator(mode="after") # type: ignore + def check_valid_out_type_with_flags(self) -> Ne16TestConf: + assert implies( + not self.has_norm_quant, self.out_type == Ne16.ACCUMULATOR_TYPE + ), ( + f"Without quantization, the output type has to be equal to the " + f"accumulator type {Ne16.ACCUMULATOR_TYPE}. Given output type {self.out_type}" + ) + assert implies( + self.has_norm_quant, + (self.has_relu and not self.out_type._signed) + or (not self.has_relu and self.out_type._signed), + ), ( + f"Output type has to be unsigned when there is relu, otherwise signed. " + f"Given output type {self.out_type} and has_relu {self.has_relu}" + ) + return self diff --git a/test/NeurekaTestConf.py b/test/NeurekaTestConf.py new file mode 100644 index 0000000..dad7fc4 --- /dev/null +++ b/test/NeurekaTestConf.py @@ -0,0 +1,140 @@ +# Luka Macan +# +# Copyright 2023 ETH Zurich and University of Bologna +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations +from Neureka import Neureka +from typing import List, Union, Optional +from NnxTestClasses import NnxTestConf +from TestClasses import implies, KernelShape, Padding, Stride, IntegerType +from pydantic import field_validator, model_validator + + +class NeurekaTestConf(NnxTestConf): + @field_validator("kernel_shape") + @classmethod + def check_valid_kernel_shape(cls, v: KernelShape) -> KernelShape: + assert v == KernelShape(height=1, width=1) or v == KernelShape( + height=3, width=3 + ), f"Unsupported kernel shape {v}. Supported 1x1 and 3x3." + return v + + @field_validator("stride") + @classmethod + def check_valid_stride(cls, v: Stride) -> Stride: + assert v == Stride(height=1, width=1) or v == Stride( + height=2, width=2 + ), f"Unsupported stride {v}. Supported 1x1 and 2x2." + return v + + @staticmethod + def _check_type( + name: str, _type: IntegerType, allowed_types: List[Union[IntegerType, str]] + ) -> None: + assert ( + _type in allowed_types + ), f"Unsupported {name} type {_type}. Supported types: {allowed_types}" + + @field_validator("in_type") + @classmethod + def check_valid_in_type(cls, v: IntegerType) -> IntegerType: + NeurekaTestConf._check_type("in_type", v, ["uint8"]) + return v + + @field_validator("out_type") + @classmethod + def check_valid_out_type(cls, v: IntegerType) -> IntegerType: + NeurekaTestConf._check_type("out_type", v, ["uint8", "int8"]) + return v + + @field_validator("weight_type") + @classmethod + def check_valid_weight_type(cls, v: IntegerType) -> IntegerType: + NeurekaTestConf._check_type("weight_type", v, ["int8"]) + return v + + @field_validator("scale_type") + @classmethod + def check_valid_scale_type(cls, v: Optional[IntegerType]) -> Optional[IntegerType]: + if v is not None: + NeurekaTestConf._check_type("scale_type", v, ["uint8", "uint32"]) + return v + + @field_validator("bias_type") + @classmethod + def check_valid_bias_type(cls, v: Optional[IntegerType]) -> Optional[IntegerType]: + if v is not None: + NeurekaTestConf._check_type("bias_type", v, ["int32"]) + return v + + @model_validator(mode="after") # type: ignore + def check_valid_out_channel_with_stride_2x2(self) -> NeurekaTestConf: + assert implies( + self.stride == Stride(height=2, width=2), self.out_channel % 2 == 0 + ), f"With stride 2x2 supported only even output channel sizes. Given output channel {self.out_channel}" + return self + + @model_validator(mode="after") # type: ignore + def check_valid_depthwise(self) -> NeurekaTestConf: + assert implies( + self.depthwise, self.kernel_shape == KernelShape(height=3, width=3) + ), f"Depthwise supported only on 3x3 kernel shape. Given kernel shape {self.kernel_shape}." + assert implies(self.depthwise, self.in_channel == self.out_channel), ( + f"Input and output channel should be the same in a depthwise layer. " + f"input channel: {self.in_channel}, output channel: {self.out_channel}" + ) + return self + + @model_validator(mode="after") # type: ignore + def check_valid_padding_with_kernel_shape_1x1(self) -> NeurekaTestConf: + assert implies( + self.kernel_shape == KernelShape(height=1, width=1), + self.padding == Padding(top=0, bottom=0, left=0, right=0), + ), f"No padding on 1x1 kernel. Given padding {self.padding}" + return self + + @field_validator("has_norm_quant") + @classmethod + def check_valid_has_norm_quant(cls, v: bool) -> bool: + assert v == True, f"Untested without has_norm_quant." + return v + + @model_validator(mode="after") # type: ignore + def check_valid_norm_quant_types_when_has_norm_qunat(self) -> NeurekaTestConf: + if self.has_norm_quant: + assert self.scale_type is not None, "Scale type was not provided." + if self.has_bias: + assert self.bias_type is not None, "Bias type was not provided." + return self + + @model_validator(mode="after") # type: ignore + def check_valid_out_type_with_flags(self) -> NeurekaTestConf: + assert implies( + not self.has_norm_quant, self.out_type == Neureka.ACCUMULATOR_TYPE + ), ( + f"Without quantization, the output type has to be equal to the " + f"accumulator type {Neureka.ACCUMULATOR_TYPE}. Given output type {self.out_type}" + ) + assert implies( + self.has_norm_quant, + (self.has_relu and not self.out_type._signed) + or (not self.has_relu and self.out_type._signed), + ), ( + f"Output type has to be unsigned when there is relu, otherwise signed. " + f"Given output type {self.out_type} and has_relu {self.has_relu}" + ) + return self diff --git a/test/Ne16TestClasses.py b/test/NnxTestClasses.py similarity index 60% rename from test/Ne16TestClasses.py rename to test/NnxTestClasses.py index d99e829..ed1b55e 100644 --- a/test/Ne16TestClasses.py +++ b/test/NnxTestClasses.py @@ -17,18 +17,18 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -from typing import List, Union, Optional, Set, Tuple +from typing import Callable, Union, Optional, Set, Tuple, Type import torch import numpy as np +import numpy.typing as npt import torch.nn.functional as F import os -from Ne16 import Ne16 from HeaderWriter import HeaderWriter -from TestClasses import implies, KernelShape, Padding, Stride, IntegerType -from pydantic import BaseModel, field_validator, model_validator, PositiveInt +from TestClasses import IntegerType, Stride, Padding, KernelShape, implies +from pydantic import BaseModel, PositiveInt -class Ne16TestConf(BaseModel): +class NnxTestConf(BaseModel): in_height: PositiveInt in_width: PositiveInt in_channel: PositiveInt @@ -46,122 +46,8 @@ class Ne16TestConf(BaseModel): has_bias: bool has_relu: bool - @field_validator("kernel_shape") - @classmethod - def check_valid_kernel_shape(cls, v: KernelShape) -> KernelShape: - assert v == KernelShape(height=1, width=1) or v == KernelShape( - height=3, width=3 - ), f"Unsupported kernel shape {v}. Supported 1x1 and 3x3." - return v - - @field_validator("stride") - @classmethod - def check_valid_stride(cls, v: Stride) -> Stride: - assert v == Stride(height=1, width=1) or v == Stride( - height=2, width=2 - ), f"Unsupported stride {v}. Supported 1x1 and 2x2." - return v - - @staticmethod - def _check_type( - name: str, _type: IntegerType, allowed_types: List[Union[IntegerType, str]] - ) -> None: - assert ( - _type in allowed_types - ), f"Unsupported {name} type {_type}. Supported types: {allowed_types}" - - @field_validator("in_type") - @classmethod - def check_valid_in_type(cls, v: IntegerType) -> IntegerType: - Ne16TestConf._check_type("in_type", v, ["uint8"]) - return v - - @field_validator("out_type") - @classmethod - def check_valid_out_type(cls, v: IntegerType) -> IntegerType: - Ne16TestConf._check_type("out_type", v, ["uint8", "int8"]) - return v - - @field_validator("weight_type") - @classmethod - def check_valid_weight_type(cls, v: IntegerType) -> IntegerType: - Ne16TestConf._check_type("weight_type", v, ["int8"]) - return v - - @field_validator("scale_type") - @classmethod - def check_valid_scale_type(cls, v: Optional[IntegerType]) -> Optional[IntegerType]: - if v is not None: - Ne16TestConf._check_type("scale_type", v, ["uint8", "uint32"]) - return v - - @field_validator("bias_type") - @classmethod - def check_valid_bias_type(cls, v: Optional[IntegerType]) -> Optional[IntegerType]: - if v is not None: - Ne16TestConf._check_type("bias_type", v, ["int32"]) - return v - - @model_validator(mode="after") # type: ignore - def check_valid_out_channel_with_stride_2x2(self) -> Ne16TestConf: - assert implies( - self.stride == Stride(height=2, width=2), self.out_channel % 2 == 0 - ), f"With stride 2x2 supported only even output channel sizes. Given output channel {self.out_channel}" - return self - - @model_validator(mode="after") # type: ignore - def check_valid_depthwise(self) -> Ne16TestConf: - assert implies( - self.depthwise, self.kernel_shape == KernelShape(height=3, width=3) - ), f"Depthwise supported only on 3x3 kernel shape. Given kernel shape {self.kernel_shape}." - assert implies(self.depthwise, self.in_channel == self.out_channel), ( - f"Input and output channel should be the same in a depthwise layer. " - f"input channel: {self.in_channel}, output channel: {self.out_channel}" - ) - return self - @model_validator(mode="after") # type: ignore - def check_valid_padding_with_kernel_shape_1x1(self) -> Ne16TestConf: - assert implies( - self.kernel_shape == KernelShape(height=1, width=1), - self.padding == Padding(top=0, bottom=0, left=0, right=0), - ), f"No padding on 1x1 kernel. Given padding {self.padding}" - return self - - @field_validator("has_norm_quant") - @classmethod - def check_valid_has_norm_quant(cls, v: bool) -> bool: - assert v == True, f"Untested without has_norm_quant." - return v - - @model_validator(mode="after") # type: ignore - def check_valid_norm_quant_types_when_has_norm_qunat(self) -> Ne16TestConf: - if self.has_norm_quant: - assert self.scale_type is not None, "Scale type was not provided." - if self.has_bias: - assert self.bias_type is not None, "Bias type was not provided." - return self - - @model_validator(mode="after") # type: ignore - def check_valid_out_type_with_flags(self) -> Ne16TestConf: - assert implies( - not self.has_norm_quant, self.out_type == Ne16.ACCUMULATOR_TYPE - ), ( - f"Without quantization, the output type has to be equal to the " - f"accumulator type {Ne16.ACCUMULATOR_TYPE}. Given output type {self.out_type}" - ) - assert implies( - self.has_norm_quant, - (self.has_relu and not self.out_type._signed) - or (not self.has_relu and self.out_type._signed), - ), ( - f"Output type has to be unsigned when there is relu, otherwise signed. " - f"Given output type {self.out_type} and has_relu {self.has_relu}" - ) - return self - - -class Ne16Test: +class NnxTest: _CONF_NAME = "conf.json" _INPUT_NAME = "input.pt" _OUTPUT_NAME = "output.pt" @@ -172,7 +58,7 @@ class Ne16Test: def __init__( self, - conf: Ne16TestConf, + conf: NnxTestConf, input: Optional[torch.Tensor], output: Optional[torch.Tensor], weight: Optional[torch.Tensor], @@ -188,7 +74,7 @@ def __init__( self.bias = bias self.global_shift = global_shift - def is_valid(self): + def is_valid(self) -> bool: return all( [ self.input is not None, @@ -203,22 +89,22 @@ def is_valid(self): def save_conf(self, path: Union[str, os.PathLike]) -> None: os.makedirs(path, exist_ok=True) - with open(os.path.join(path, Ne16Test._CONF_NAME), "w") as fp: + with open(os.path.join(path, NnxTest._CONF_NAME), "w") as fp: fp.write(self.conf.model_dump_json(indent=4)) def save_data(self, path: Union[str, os.PathLike]) -> None: os.makedirs(path, exist_ok=True) - torch.save(self.input, os.path.join(path, Ne16Test._INPUT_NAME)) - torch.save(self.output, os.path.join(path, Ne16Test._OUTPUT_NAME)) - torch.save(self.weight, os.path.join(path, Ne16Test._WEIGHT_NAME)) + torch.save(self.input, os.path.join(path, NnxTest._INPUT_NAME)) + torch.save(self.output, os.path.join(path, NnxTest._OUTPUT_NAME)) + torch.save(self.weight, os.path.join(path, NnxTest._WEIGHT_NAME)) if self.scale is not None: - torch.save(self.scale, os.path.join(path, Ne16Test._SCALE_NAME)) + torch.save(self.scale, os.path.join(path, NnxTest._SCALE_NAME)) if self.bias is not None: - torch.save(self.bias, os.path.join(path, Ne16Test._BIAS_NAME)) + torch.save(self.bias, os.path.join(path, NnxTest._BIAS_NAME)) if self.global_shift is not None: torch.save( - self.global_shift, os.path.join(path, Ne16Test._GLOBAL_SHIFT_NAME) + self.global_shift, os.path.join(path, NnxTest._GLOBAL_SHIFT_NAME) ) def save(self, path: Union[str, os.PathLike]) -> None: @@ -228,33 +114,33 @@ def save(self, path: Union[str, os.PathLike]) -> None: @staticmethod def is_test_dir(path: Union[str, os.PathLike]) -> bool: fileset = set(os.listdir(path)) - required_fileset = set([Ne16Test._CONF_NAME]) + required_fileset = set([NnxTest._CONF_NAME]) return required_fileset.issubset(fileset) @classmethod - def load(cls, path: Union[str, os.PathLike]) -> "Ne16Test": - assert Ne16Test.is_test_dir( + def load(cls, confCls: Type[NnxTestConf], path: Union[str, os.PathLike]) -> NnxTest: + assert NnxTest.is_test_dir( path ), f"ERROR: Test {path} does not contain the necessary files." - with open(os.path.join(path, Ne16Test._CONF_NAME), "r") as fp: - conf = Ne16TestConf.model_validate_json(fp.read()) + with open(os.path.join(path, NnxTest._CONF_NAME), "r") as fp: + conf = confCls.model_validate_json(fp.read()) def load_if_exist(filename: str) -> Optional[torch.Tensor]: filepath = os.path.join(path, filename) return torch.load(filepath) if os.path.isfile(filepath) else None - input = load_if_exist(Ne16Test._INPUT_NAME) - output = load_if_exist(Ne16Test._OUTPUT_NAME) - weight = load_if_exist(Ne16Test._WEIGHT_NAME) - scale = load_if_exist(Ne16Test._SCALE_NAME) - bias = load_if_exist(Ne16Test._BIAS_NAME) - global_shift = load_if_exist(Ne16Test._GLOBAL_SHIFT_NAME) + input = load_if_exist(NnxTest._INPUT_NAME) + output = load_if_exist(NnxTest._OUTPUT_NAME) + weight = load_if_exist(NnxTest._WEIGHT_NAME) + scale = load_if_exist(NnxTest._SCALE_NAME) + bias = load_if_exist(NnxTest._BIAS_NAME) + global_shift = load_if_exist(NnxTest._GLOBAL_SHIFT_NAME) return cls(conf, input, output, weight, scale, bias, global_shift) -class Ne16TestGenerator: +class NnxTestGenerator: _DEFAULT_SEED = 0 @staticmethod @@ -286,17 +172,18 @@ def _cast( @staticmethod def from_conf( - conf: Ne16TestConf, + conf: NnxTestConf, + accumulator_type: IntegerType, input: Optional[torch.Tensor] = None, weight: Optional[torch.Tensor] = None, scale: Optional[torch.Tensor] = None, bias: Optional[torch.Tensor] = None, global_shift: Optional[torch.Tensor] = None, - ) -> Ne16Test: - torch.manual_seed(Ne16TestGenerator._DEFAULT_SEED) + ) -> NnxTest: + torch.manual_seed(NnxTestGenerator._DEFAULT_SEED) if input is None: - input = Ne16TestGenerator._random_data( + input = NnxTestGenerator._random_data( _type=conf.in_type, shape=(1, conf.in_channel, conf.in_height, conf.in_width), ) @@ -314,7 +201,7 @@ def from_conf( ) if weight is None: - weight = Ne16TestGenerator._random_data( + weight = NnxTestGenerator._random_data( _type=conf.weight_type, shape=( conf.out_channel, @@ -333,14 +220,14 @@ def from_conf( groups=conf.in_channel if conf.depthwise else 1, ).type(torch.int64) # Use only the lower 32bits - output = Ne16TestGenerator._cast( - output, Ne16.ACCUMULATOR_TYPE, saturate=False - ).type(torch.int32) + output = NnxTestGenerator._cast(output, accumulator_type, saturate=False).type( + torch.int32 + ) if conf.has_norm_quant: if scale is None: assert conf.scale_type is not None - scale = Ne16TestGenerator._random_data( + scale = NnxTestGenerator._random_data( conf.scale_type, shape=(1, conf.out_channel, 1, 1) ) # Scale accumulators are in 48bit, so keeping the data in 64bit @@ -350,16 +237,16 @@ def from_conf( if conf.has_bias: # Saturating cast to int32 assert conf.bias_type is not None - output = Ne16TestGenerator._cast( + output = NnxTestGenerator._cast( output, conf.bias_type, saturate=True ).type(torch.int32) if bias is None: - bias = Ne16TestGenerator._random_data( + bias = NnxTestGenerator._random_data( conf.bias_type, shape=(1, conf.out_channel, 1, 1) ).type(torch.int32) output = output + bias - output = Ne16TestGenerator._cast( + output = NnxTestGenerator._cast( output, conf.bias_type, saturate=False ).type(torch.int32) @@ -367,15 +254,15 @@ def from_conf( output = F.relu(output) if global_shift is None: - global_shift = Ne16TestGenerator._global_shift( + global_shift = NnxTestGenerator._global_shift( output, conf.out_type, conf.has_relu ) output = output >> global_shift # Saturate into out_type - output = Ne16TestGenerator._cast(output, conf.out_type, saturate=True) + output = NnxTestGenerator._cast(output, conf.out_type, saturate=True) - return Ne16Test( + return NnxTest( conf=conf, input=input, output=output, @@ -386,28 +273,38 @@ def from_conf( ) @staticmethod - def regenerate(test: Ne16Test, regen_tensors: Set[str]) -> Ne16Test: + def regenerate(test: NnxTest, regen_tensors: Set[str]) -> NnxTest: test_tensors = set(["input", "output", "weight", "scale", "bias"]) load_tensors = test_tensors - regen_tensors kwargs = {tensor: getattr(test, tensor) for tensor in load_tensors} - return Ne16TestGenerator.from_conf(test.conf, **kwargs) + return NnxTestGenerator.from_conf(test.conf, **kwargs) -class Ne16TestHeaderGenerator: +class NnxTestHeaderGenerator: DEFAULT_HEADERS_DIR = "app/gen" - def __init__(self, headers_dir: Optional[Union[str, os.PathLike]] = None): + def __init__( + self, + weight_unroll: Callable[ + [npt.NDArray[np.uint8], int, bool], npt.NDArray[np.uint8] + ], + headers_dir: Optional[Union[str, os.PathLike]] = None, + ): if headers_dir is None: - headers_dir = Ne16TestHeaderGenerator.DEFAULT_HEADERS_DIR + headers_dir = NnxTestHeaderGenerator.DEFAULT_HEADERS_DIR self.header_writer = HeaderWriter(headers_dir) + # function that takes the weights in CoutCinK format, bitwidth, and a depthwise flag, + # and returns a numpy array of dtype=np.uint8 of data in a layout correct for the accelerator + self.weight_unroll = weight_unroll - def generate(self, test_name: str, test: Ne16Test): + def generate(self, test_name: str, test: NnxTest): assert test.input is not None and test.output is not None _, in_channel, in_height, in_width = test.input.shape _, out_channel, out_height, out_width = test.output.shape # Render input in_ctype = test.conf.in_type.ctype() + in_signed = test.conf.in_type._signed in_data = test.input.permute(0, 2, 3, 1).ravel() self.header_writer.generate_vector_files( "input", _type=in_ctype, size=in_data.numel(), init=in_data @@ -431,10 +328,10 @@ def generate(self, test_name: str, test: Ne16Test): weight_offset = -(2 ** (weight_bits - 1)) weight_out_ch, weight_in_ch, weight_ks_h, weight_ks_w = test.weight.shape weight_data: np.ndarray = test.weight.numpy() - weight_offset - weight_init = Ne16.weight_unroll( + weight_init = self.weight_unroll( weight_data.astype(np.uint8), weight_type._bits, - depthwise=test.conf.depthwise, + test.conf.depthwise, ) self.header_writer.generate_vector_files( "weight", _type="uint8_t", size=weight_init.size, init=weight_init @@ -470,6 +367,7 @@ def generate(self, test_name: str, test: Ne16Test): "height": in_height, "width": in_width, "channel": in_channel, + "signed": in_signed, "bits": 8, }, "output": { diff --git a/test/conf.toml b/test/conf.toml index 1222f1d..c24055a 100644 --- a/test/conf.toml +++ b/test/conf.toml @@ -22,7 +22,7 @@ # Ne16TestClasses.py:Ne16TestConf().check_valid() # Input dimensions -in_height = 3 +in_height = 4 in_width = 3 in_channel = 8 diff --git a/test/conftest.py b/test/conftest.py index 6c2c15b..b871141 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -18,7 +18,14 @@ import os from typing import Union -from Ne16TestClasses import Ne16Test, Ne16TestGenerator +from Ne16 import Ne16 +from Ne16TestConf import Ne16TestConf +from Neureka import Neureka +from NeurekaTestConf import NeurekaTestConf +from NnxTestClasses import NnxTest, NnxTestGenerator + + +_SUPPORTED_ACCELERATORS = ["ne16", "neureka"] def pytest_addoption(parser): @@ -39,6 +46,13 @@ def pytest_addoption(parser): default=False, help="Recursively search for tests in given test directories.", ) + parser.addoption( + "-A", + "--accelerator", + choices=_SUPPORTED_ACCELERATORS, + default="ne16", + help="Choose an accelerator to test. Default: ne16", + ) parser.addoption( "--regenerate", action="store_true", @@ -54,7 +68,7 @@ def pytest_addoption(parser): def _find_test_dirs(path: Union[str, os.PathLike]): - return [dirpath for dirpath, _, _ in os.walk(path) if Ne16Test.is_test_dir(dirpath)] + return [dirpath for dirpath, _, _ in os.walk(path) if NnxTest.is_test_dir(dirpath)] def pytest_generate_tests(metafunc): @@ -62,6 +76,7 @@ def pytest_generate_tests(metafunc): recursive = metafunc.config.getoption("recursive") regenerate = metafunc.config.getoption("regenerate") timeout = metafunc.config.getoption("timeout") + nnxName = metafunc.config.getoption("accelerator") if recursive: tests_dirs = test_dirs @@ -71,10 +86,24 @@ def pytest_generate_tests(metafunc): # (Re)Generate test data for test_dir in test_dirs: - test = Ne16Test.load(test_dir) + test = NnxTest.load(Ne16TestConf, test_dir) if not test.is_valid() or regenerate: - test = Ne16TestGenerator.from_conf(test.conf) + test = NnxTestGenerator.from_conf(test.conf, Ne16.ACCUMULATOR_TYPE) test.save_data(test_dir) + if nnxName == "ne16": + nnxCls = Ne16 + nnxTestConfCls = Ne16TestConf + elif nnxName == "neureka": + nnxCls = Neureka + nnxTestConfCls = NeurekaTestConf + else: + assert ( + False + ), f"Given accelerator {nnxName} not supported. Supported accelerators: {_SUPPORTED_ACCELERATORS}" + metafunc.parametrize("path", test_dirs) metafunc.parametrize("timeout", [timeout]) + metafunc.parametrize("nnxName", [nnxName]) + metafunc.parametrize("nnxCls", [nnxCls]) + metafunc.parametrize("nnxTestConfCls", [nnxTestConfCls]) diff --git a/test/test.py b/test/test.py index 39709b6..778c6ca 100644 --- a/test/test.py +++ b/test/test.py @@ -18,10 +18,12 @@ import os import re -from typing import Union, Optional, Tuple +from typing import Dict, Union, Optional, Tuple, Type import locale import subprocess -from Ne16TestClasses import Ne16Test, Ne16TestHeaderGenerator +from Ne16 import Ne16 +from Neureka import Neureka +from NnxTestClasses import NnxTest, NnxTestConf, NnxTestHeaderGenerator from pathlib import Path HORIZONTAL_LINE = "\n" + "-" * 100 + "\n" @@ -49,17 +51,29 @@ def captured_output( def execute_command( - cmd: str, timeout: int = 30, cflags: Optional[str] = None + cmd: str, + timeout: int = 30, + cflags: Optional[str] = None, + envflags: Optional[Dict[str, str]] = None, ) -> Tuple[bool, str, str, Optional[str]]: - app_cflags = 'APP_CFLAGS="' + " ".join(cflags) + '" ' if cflags else "" - cmd = cmd + app_cflags + env = os.environ + if cflags: + env["APP_CFLAGS"] = '"' + " ".join(cflags) + '"' + if envflags: + for key, value in envflags.items(): + env[key] = value status = None stdout = None try: proc = subprocess.run( - cmd.split(), check=True, capture_output=True, text=True, timeout=timeout + cmd.split(), + check=True, + capture_output=True, + text=True, + timeout=timeout, + env=env, ) status = True msg = "OK" @@ -94,15 +108,23 @@ def assert_message( return retval -def test(path: str, timeout: int): +def test( + path: str, + timeout: int, + nnxName: str, + nnxCls: Union[Type[Ne16], Type[Neureka]], + nnxTestConfCls: Type[NnxTestConf], +): test_name = path - test = Ne16Test.load(path) + test = NnxTest.load(nnxTestConfCls, path) - Ne16TestHeaderGenerator().generate(test_name, test) + NnxTestHeaderGenerator(nnxCls.weight_unroll).generate(test_name, test) Path("app/src/nnx_layer.c").touch() cmd = f"make -C app all run platform=gvsoc" - passed, msg, stdout, stderr = execute_command(cmd=cmd, timeout=timeout) + passed, msg, stdout, stderr = execute_command( + cmd=cmd, timeout=timeout, envflags={"ACCELERATOR": nnxName} + ) assert passed, assert_message(msg, test_name, cmd, stdout, stderr) diff --git a/test/testgen.py b/test/testgen.py index e748f2e..d27c28e 100644 --- a/test/testgen.py +++ b/test/testgen.py @@ -20,24 +20,36 @@ import argparse import json import toml -from typing import Optional, Union, Set -from Ne16TestClasses import ( - Ne16TestConf, - Ne16TestGenerator, - Ne16Test, - Ne16TestHeaderGenerator, +from typing import Optional, Type, Union, Set +from Ne16 import Ne16 +from Ne16TestConf import Ne16TestConf +from Neureka import Neureka +from NeurekaTestConf import NeurekaTestConf +from NnxTestClasses import ( + NnxTest, + NnxTestConf, + NnxTestGenerator, + NnxTestHeaderGenerator, ) -def headers_gen(args, test: Optional[Ne16Test] = None): +def headers_gen( + args, + nnxCls: Union[Type[Ne16], Type[Neureka]], + nnxTestConfCls: Type[NnxTestConf], + test: Optional[NnxTest] = None, +): if test is None: - test = Ne16Test.load(args.test_dir) + test = NnxTest.load(nnxTestConfCls, args.test_dir) + assert test is not None if not test.is_valid(): - test = Ne16TestGenerator.from_conf(test.conf) - Ne16TestHeaderGenerator().generate(args.test_dir, test) + test = NnxTestGenerator.from_conf(test.conf, nnxCls.ACCUMULATOR_TYPE) + NnxTestHeaderGenerator(nnxCls.weight_unroll).generate(args.test_dir, test) -def test_gen(args): +def test_gen( + args, nnxCls: Union[Type[Ne16], Type[Neureka]], nnxTestConfCls: Type[NnxTestConf] +): if args.conf.endswith(".toml"): test_conf_dict = toml.load(args.conf) elif args.conf.endswith(".json"): @@ -49,37 +61,67 @@ def test_gen(args): ) exit(-1) - test_conf = Ne16TestConf.model_validate(test_conf_dict) - test = Ne16TestGenerator.from_conf(test_conf) + test_conf = nnxTestConfCls.model_validate(test_conf_dict) + test = NnxTestGenerator.from_conf(test_conf, nnxCls.ACCUMULATOR_TYPE) if not args.skip_save: test.save(args.test_dir) if args.headers: - headers_gen(args, test) + headers_gen(args, nnxCls, nnxTestConfCls, test) -def _regen(path: Union[str, os.PathLike], regen_tensors: Set[str]) -> None: - test = Ne16Test.load(path) - test = Ne16TestGenerator.regenerate(test, regen_tensors) +def _regen( + path: Union[str, os.PathLike], + regen_tensors: Set[str], + nnxTestConfCls: Type[NnxTestConf], +) -> None: + test = NnxTest.load(nnxTestConfCls, path) + test = NnxTestGenerator.regenerate(test, regen_tensors) test.save(path) -def _regen_recursive(path: Union[str, os.PathLike], regen_tensors: Set[str]) -> None: - if Ne16Test.is_test_dir(path): - _regen(path, regen_tensors) +def _regen_recursive( + path: Union[str, os.PathLike], + regen_tensors: Set[str], + nnxTestConfCls: Type[NnxTestConf], +) -> None: + if NnxTest.is_test_dir(path): + _regen(path, regen_tensors, nnxTestConfCls) return for dirpath, _, _ in os.walk(path): - _regen_recursive(dirpath, regen_tensors) + _regen_recursive(dirpath, regen_tensors, nnxTestConfCls) -def test_regen(args): +def test_regen( + args, nnxCls: Union[Type[Ne16], Type[Neureka]], nnxTestConfCls: Type[NnxTestConf] +): + _ = nnxCls regen_tensors = set(args.tensors + ["output"]) for test_dir in args.test_dirs: if args.recurse: - _regen_recursive(test_dir, regen_tensors) + _regen_recursive(test_dir, regen_tensors, nnxTestConfCls) else: - _regen(test_dir, regen_tensors) + _regen(test_dir, regen_tensors, nnxTestConfCls) + + +def add_common_arguments(parser: argparse.ArgumentParser): + parser.add_argument( + "-t", + "--test-dir", + type=str, + dest="test_dir", + required=True, + help="Path to the test.", + ) + + parser.add_argument( + "-a", + "--accelerator", + choices=["ne16", "neureka"], + default="ne16", + help="Choose an accelerator. Default: ne16", + ) parser = argparse.ArgumentParser( @@ -91,14 +133,7 @@ def test_regen(args): parser_header = subparsers.add_parser( "headers", description="Generate headers for a single test." ) -parser_header.add_argument( - "-t", - "--test-dir", - type=str, - dest="test_dir", - required=True, - help="Path to the test." "basename.", -) +add_common_arguments(parser_header) parser_header.set_defaults(func=headers_gen) parser_test = subparsers.add_parser( @@ -112,14 +147,6 @@ def test_regen(args): required=True, help="Path to the configuration file.", ) -parser_test.add_argument( - "-t", - "--test-dir", - type=str, - dest="test_dir", - required=True, - help="Path to the test. " "basename.", -) parser_test.add_argument( "--headers", action="store_true", default=False, help="Generate headers." ) @@ -130,6 +157,7 @@ def test_regen(args): dest="skip_save", help="Skip saving the test.", ) +add_common_arguments(parser_test) parser_test.set_defaults(func=test_gen) parser_regen = subparsers.add_parser("regen", description="Regenerate test tensors.") @@ -138,25 +166,27 @@ def test_regen(args): type=str, nargs="?", default=[], - help="Tensors that should be regenerated. Output " "included by default.", -) -parser_regen.add_argument( - "-t", - "--test-dir", - action="append", - dest="test_dirs", - required=True, - help="Path to the test.", + help="Tensors that should be regenerated. Output included by default.", ) parser_regen.add_argument( "-r", "--recursive", action="store_true", default=False, - help="Recursively search for test directiories " "inside given test directories.", + help="Recursively search for test directiories inside given test directories.", ) +add_common_arguments(parser_regen) parser_regen.set_defaults(func=test_regen) args = parser.parse_args() -args.func(args) +if args.accelerator == "ne16": + nnxCls = Ne16 + nnxTestConfCls = Ne16TestConf +elif args.accelerator == "neureka": + nnxCls = Neureka + nnxTestConfCls = NeurekaTestConf +else: + assert False, f"Unsupported accelerator {args.accelerator}." + +args.func(args, nnxCls, nnxTestConfCls)