Skip to content

Commit

Permalink
Refactor quantizers
Browse files Browse the repository at this point in the history
  • Loading branch information
rwnobrega committed Nov 16, 2024
1 parent 69debbe commit 437bf67
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 176 deletions.
26 changes: 26 additions & 0 deletions src/komm/_quantization/AbstractScalarQuantizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from abc import ABC, abstractmethod

import numpy as np
import numpy.typing as npt


class AbstractScalarQuantizer(ABC):

@property
@abstractmethod
def levels(self) -> npt.NDArray[np.float64]:
pass

@property
@abstractmethod
def thresholds(self) -> npt.NDArray[np.float64]:
pass

@property
@abstractmethod
def num_levels(self) -> int:
pass

@abstractmethod
def __call__(self, input_signal: npt.ArrayLike) -> npt.NDArray[np.float64]:
pass
123 changes: 52 additions & 71 deletions src/komm/_quantization/ScalarQuantizer.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,76 @@
import numpy as np
import numpy.typing as npt
from attrs import field, frozen

from .AbstractScalarQuantizer import AbstractScalarQuantizer

class ScalarQuantizer:

@frozen
class ScalarQuantizer(AbstractScalarQuantizer):
r"""
General scalar quantizer. It is defined by a list of *levels*, $v_0, v_1, \ldots, v_{L-1}$, and a list of *thresholds*, $t_0, t_1, \ldots, t_L$, satisfying
$$
-\infty = t_0 < v_0 < t_1 < v_1 < \cdots < t_{L - 1} < v_{L - 1} < t_L = +\infty.
$$
Given an input $x \in \mathbb{R}$, the output of the quantizer is given by $y = v_i$ if and only if $t_i \leq x < t_{i+1}$, where $i \in [0:L)$.
To invoke the quantizer, call the object giving the input signal as parameter (see example in the constructor below).
To invoke the quantizer, call the object giving the input signal as parameter (see example below).
Attributes:
levels (Array1D[float]): The quantizer levels $v_0, v_1, \ldots, v_{L-1}$. It should be a list floats of length $L$.
thresholds (Array1D[float]): The quantizer finite thresholds $t_1, t_2, \ldots, t_{L-1}$. It should be a list of floats of length $L - 1$.
Examples:
The following example considers the $5$-level scalar quantizer whose characteristic (input × output) curve is depicted in the figure below.
<figure markdown>
![Scalar quantizer example.](/figures/scalar_quantizer_5.svg)
</figure>
The levels are
$$
v_0 = -2, ~ v_1 = -1, ~ v_2 = 0, ~ v_3 = 1, ~ v_4 = 2,
$$
and the thresholds are
$$
t_0 = -\infty, ~ t_1 = -1.5, ~ t_2 = -0.3, ~ t_3 = 0.8, ~ t_4 = 1.4, ~ t_5 = \infty.
$$
>>> quantizer = komm.ScalarQuantizer(levels=[-2.0, -1.0, 0.0, 1.0, 2.0], thresholds=[-1.5, -0.3, 0.8, 1.4])
>>> x = np.linspace(-2.5, 2.5, num=11)
>>> y = quantizer(x)
>>> np.vstack([x, y])
array([[-2.5, -2. , -1.5, -1. , -0.5, 0. , 0.5, 1. , 1.5, 2. , 2.5],
[-2. , -2. , -1. , -1. , -1. , 0. , 0. , 1. , 2. , 2. , 2. ]])
"""

def __init__(self, levels, thresholds):
r"""
Constructor for the class.
Parameters:
levels (Array1D[float]): The quantizer levels $v_0, v_1, \ldots, v_{L-1}$. It should be a list floats of length $L$.
thresholds (Array1D[float]): The finite quantizer thresholds $t_1, t_2, \ldots, t_{L-1}$. It should be a list of floats of length $L - 1$. Moreover, they must satisfy $v_0 < t_1 < v_1 < \cdots < t_{L - 1} < v_{L - 1}$.
Examples:
The following example considers the $5$-level scalar quantizer whose characteristic (input × output) curve is depicted in the figure below.
<figure markdown>
![Scalar quantizer example.](/figures/scalar_quantizer_5.svg)
</figure>
The levels are
$$
v_0 = -2, ~ v_1 = -1, ~ v_2 = 0, ~ v_3 = 1, ~ v_4 = 2,
$$
and the thresholds are
$$
t_0 = -\infty, ~ t_1 = -1.5, ~ t_2 = -0.3, ~ t_3 = 0.8, ~ t_4 = 1.4, ~ t_5 = \infty.
$$
>>> quantizer = komm.ScalarQuantizer(levels=[-2.0, -1.0, 0.0, 1.0, 2.0], thresholds=[-1.5, -0.3, 0.8, 1.4])
>>> x = np.linspace(-2.5, 2.5, num=11)
>>> y = quantizer(x)
>>> np.vstack([x, y])
array([[-2.5, -2. , -1.5, -1. , -0.5, 0. , 0.5, 1. , 1.5, 2. , 2.5],
[-2. , -2. , -1. , -1. , -1. , 0. , 0. , 1. , 2. , 2. , 2. ]])
"""
self._levels = np.array(levels, dtype=float)
self._thresholds = np.array(thresholds, dtype=float)
self._num_levels = self._levels.size
levels: npt.NDArray[np.float64] = field(
converter=np.asarray, repr=lambda x: x.tolist()
)
thresholds: npt.NDArray[np.float64] = field(
converter=np.asarray, repr=lambda x: x.tolist()
)

if self._thresholds.size != self._num_levels - 1:
def __attrs_post_init__(self) -> None:
if self.thresholds.size != self.num_levels - 1:
raise ValueError("length of 'thresholds' must be 'num_levels - 1'")

interleaved = np.empty(2 * self._num_levels - 1, dtype=float)
interleaved[0::2] = self._levels
interleaved[1::2] = self._thresholds
interleaved = np.empty(2 * self.num_levels - 1, dtype=float)
interleaved[0::2] = self.levels
interleaved[1::2] = self.thresholds

if not np.array_equal(np.unique(interleaved), interleaved):
raise ValueError("invalid values for 'levels' and 'thresholds'")

@property
def levels(self):
r"""
The quantizer levels $v_0, v_1, \ldots, v_{L-1}$.
"""
return self._levels

@property
def thresholds(self):
r"""
The finite quantizer thresholds $t_1, t_2, \ldots, t_{L-1}$.
"""
return self._thresholds

@property
def num_levels(self):
def num_levels(self) -> int:
r"""
The number of quantization levels $L$.
"""
return self._num_levels

def __call__(self, input_signal):
input_signal_tile = np.tile(
input_signal, reps=(self._thresholds.size, 1)
).transpose()
output_signal = self._levels[
np.sum(input_signal_tile >= self._thresholds, axis=1)
]
return output_signal
return self.levels.size

def __repr__(self):
args = "levels={}, thresholds={}".format(
self._levels.tolist(), self._thresholds.tolist()
)
return "{}({})".format(self.__class__.__name__, args)
def __call__(self, input_signal: npt.ArrayLike) -> npt.NDArray[np.float64]:
tiled = np.tile(input_signal, reps=(self.thresholds.size, 1)).transpose()
output_signal = self.levels[np.sum(tiled >= self.thresholds, axis=1)]
return output_signal
173 changes: 80 additions & 93 deletions src/komm/_quantization/UniformQuantizer.py
Original file line number Diff line number Diff line change
@@ -1,121 +1,108 @@
from functools import cache
from typing import Literal

import numpy as np
import numpy.typing as npt
from attrs import field, frozen

from .ScalarQuantizer import ScalarQuantizer
from .AbstractScalarQuantizer import AbstractScalarQuantizer


class UniformQuantizer(ScalarQuantizer):
@frozen
class UniformQuantizer(AbstractScalarQuantizer):
r"""
Uniform scalar quantizer. It is a [scalar quantizer](/ref/ScalarQuantizer) in which the separation between levels is constant, $\Delta$, and the thresholds are the mid-point between adjacent levels.
Attributes:
num_levels: The number of quantization levels $L$. It must be greater than $1$.
input_peak: The peak of the input signal $x_\mathrm{p}$. The default value is `1.0`.
choice: The choice for the uniform quantizer. Must be one of `'unsigned'` | `'mid-riser'` | `'mid-tread'`. The default value is `'mid-riser'`.
Examples:
>>> quantizer = komm.UniformQuantizer(num_levels=8)
>>> quantizer.levels
array([-0.875, -0.625, -0.375, -0.125, 0.125, 0.375, 0.625, 0.875])
>>> quantizer.thresholds
array([-0.75, -0.5 , -0.25, 0. , 0.25, 0.5 , 0.75])
>>> x = np.linspace(-0.5, 0.5, num=11)
>>> y = quantizer(x)
>>> np.vstack([x, y]) # doctest: +NORMALIZE_WHITESPACE
array([[-0.5 , -0.4 , -0.3 , -0.2 , -0.1 , 0. , 0.1 , 0.2 , 0.3 , 0.4 , 0.5 ],
[-0.375, -0.375, -0.375, -0.125, -0.125, 0.125, 0.125, 0.125, 0.375, 0.375, 0.625]])
>>> quantizer = komm.UniformQuantizer(num_levels=4, input_peak=1.0, choice='unsigned')
>>> quantizer.levels
array([0. , 0.25, 0.5 , 0.75])
>>> quantizer.thresholds
array([0.125, 0.375, 0.625])
>>> quantizer = komm.UniformQuantizer(num_levels=4, input_peak=1.0, choice='mid-riser')
>>> quantizer.levels
array([-0.75, -0.25, 0.25, 0.75])
>>> quantizer.thresholds
array([-0.5, 0. , 0.5])
>>> quantizer = komm.UniformQuantizer(num_levels=4, input_peak=1.0, choice='mid-tread')
>>> quantizer.levels
array([-1. , -0.5, 0. , 0.5])
>>> quantizer.thresholds
array([-0.75, -0.25, 0.25])
"""

def __init__(self, num_levels, input_peak=1.0, choice="mid-riser"):
r"""
Constructor for the class.
Parameters:
num_levels (int): The number of quantization levels $L$.
input_peak (Optional[float]): The peak of the input signal $x_\mathrm{p}$. The default value is `1.0`.
choice (Optional[str]): The choice for the uniform quantizer. Must be one of `'unsigned'` | `'mid-riser'` | `'mid-tread'`. The default value is `'mid-riser'`.
Examples:
>>> quantizer = komm.UniformQuantizer(num_levels=8)
>>> quantizer.levels
array([-0.875, -0.625, -0.375, -0.125, 0.125, 0.375, 0.625, 0.875])
>>> quantizer.thresholds
array([-0.75, -0.5 , -0.25, 0. , 0.25, 0.5 , 0.75])
>>> x = np.linspace(-0.5, 0.5, num=11)
>>> y = quantizer(x)
>>> np.vstack([x, y]) # doctest: +NORMALIZE_WHITESPACE
array([[-0.5 , -0.4 , -0.3 , -0.2 , -0.1 , 0. , 0.1 , 0.2 , 0.3 , 0.4 , 0.5 ],
[-0.375, -0.375, -0.375, -0.125, -0.125, 0.125, 0.125, 0.125, 0.375, 0.375, 0.625]])
>>> quantizer = komm.UniformQuantizer(num_levels=4, input_peak=1.0, choice='unsigned')
>>> quantizer.levels
array([0. , 0.25, 0.5 , 0.75])
>>> quantizer.thresholds
array([0.125, 0.375, 0.625])
>>> quantizer = komm.UniformQuantizer(num_levels=4, input_peak=1.0, choice='mid-riser')
>>> quantizer.levels
array([-0.75, -0.25, 0.25, 0.75])
>>> quantizer.thresholds
array([-0.5, 0. , 0.5])
>>> quantizer = komm.UniformQuantizer(num_levels=4, input_peak=1.0, choice='mid-tread')
>>> quantizer.levels
array([-1. , -0.5, 0. , 0.5])
>>> quantizer.thresholds
array([-0.75, -0.25, 0.25])
"""
delta = (
input_peak / num_levels
if choice == "unsigned"
else 2.0 * input_peak / num_levels
)

if choice == "unsigned":
min_level = 0.0
max_level = input_peak
levels = np.linspace(min_level, max_level, num=num_levels, endpoint=False)
elif choice == "mid-riser":
min_level = -input_peak + (delta / 2) * (num_levels % 2 == 0)
levels = np.linspace(
min_level, -min_level, num=num_levels, endpoint=(num_levels % 2 == 0)
)
elif choice == "mid-tread":
min_level = -input_peak + (delta / 2) * (num_levels % 2 == 1)
levels = np.linspace(
min_level, -min_level, num=num_levels, endpoint=(num_levels % 2 == 1)
)
else:
num_levels: int # TODO: Fix type error
input_peak: float = field(default=1.0)
choice: Literal["unsigned", "mid-riser", "mid-tread"] = field(default="mid-riser")

def __attrs_post_init__(self) -> None:
if self.num_levels < 2:
raise ValueError("number of levels must be greater than 1")
if self.choice not in ["unsigned", "mid-riser", "mid-tread"]:
raise ValueError(
"parameter 'choice' must be in {'unsigned', 'mid-riser', 'mid-tread'}"
)

thresholds = (levels + delta / 2)[:-1]
super().__init__(levels, thresholds)

self._quantization_step = delta
self._input_peak = float(input_peak)
self._choice = choice

@property
def quantization_step(self):
@cache
def levels(self) -> npt.NDArray[np.float64]:
r"""
The quantization step $\Delta$.
The quantizer levels $v_0, v_1, \ldots, v_{L-1}$.
"""
return self._quantization_step
nl, xp, delta = self.num_levels, self.input_peak, self.quantization_step
if self.choice == "unsigned":
min_level, max_level = 0.0, xp
return np.linspace(min_level, max_level, num=nl, endpoint=False)
elif self.choice == "mid-riser":
min_level = -xp + (delta / 2) * (nl % 2 == 0)
return np.linspace(min_level, -min_level, num=nl, endpoint=(nl % 2 == 0))
else: # self.choice == "mid-tread"
min_level = -xp + (delta / 2) * (nl % 2 == 1)
return np.linspace(min_level, -min_level, num=nl, endpoint=(nl % 2 == 1))

@property
def input_peak(self):
@cache
def thresholds(self) -> npt.NDArray[np.float64]:
r"""
The peak of the input signal $x_\mathrm{p}$.
The quantizer finite thresholds $t_1, t_2, \ldots, t_{L-1}$.
"""
return self._input_peak
return (self.levels + self.quantization_step / 2)[:-1]

@property
def choice(self):
@cache
def quantization_step(self) -> float:
r"""
The choice for the uniform quantizer (`'unsigned'` | `'mid-riser'` | `'mid-tread'`).
The quantization step $\Delta$.
"""
return self._choice
d = self.input_peak / self.num_levels
return d if self.choice == "unsigned" else 2.0 * d

def __call__(self, input_signal):
def __call__(self, input_signal: npt.ArrayLike) -> npt.NDArray[np.float64]:
input_signal = np.array(input_signal, dtype=float, ndmin=1)
delta = self._quantization_step
if self._choice in ["unsigned", "mid-tread"]:
delta = self.quantization_step
if self.choice in ["unsigned", "mid-tread"]:
quantized = delta * np.floor(input_signal / delta + 0.5)
else: # self._choice == "mid-riser"
else: # self.choice == "mid-riser"
quantized = delta * (np.floor(input_signal / delta) + 0.5)
output_signal = np.clip(
quantized, a_min=self._levels[0], a_max=self._levels[-1]
)
output_signal = np.clip(quantized, a_min=self.levels[0], a_max=self.levels[-1])
return output_signal

def __repr__(self):
args = "num_levels={}, input_peak={}, choice='{}'".format(
self._num_levels, self._input_peak, self._choice
)
return "{}({})".format(self.__class__.__name__, args)
13 changes: 13 additions & 0 deletions tests/quantization/test_scalar_quantizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import numpy as np

import komm


def test_scalar_quantizer():
quantizer = komm.ScalarQuantizer(levels=[-1.0, 0.0, 1.2], thresholds=[-0.5, 0.8])
assert np.allclose(
quantizer([-1.01, -1.0, -0.99, -0.51, -0.5, -0.49]), [-1, -1, -1, -1, 0, 0]
)
assert np.allclose(quantizer([0.8, -0.5]), [1.2, 0.0])
assert np.allclose(quantizer([-1.0, 0.0, 1.2]), [-1.0, 0.0, 1.2])
assert np.allclose(quantizer([-np.inf, np.inf]), [-1.0, 1.2])
Loading

0 comments on commit 437bf67

Please sign in to comment.