Skip to content

Commit

Permalink
test/feat/refactor: added preliminary global/local noise estimation f…
Browse files Browse the repository at this point in the history
…unction with tests; minor refactoring of finite differences; improved finite difference tests model handling
  • Loading branch information
MothNik committed May 20, 2024
1 parent 395c7c0 commit dd4cc7a
Show file tree
Hide file tree
Showing 6 changed files with 667 additions and 23 deletions.
337 changes: 333 additions & 4 deletions chemotools/utils/_finite_differences.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
from math import comb
from numbers import Integral
"""
This utility submodule provides functions for the computation of forward finite
differences, namely
- the kernel for forward and central finite differences,
- computation of related kernel matrices
- estimation of the noise standard deviation of a series
"""

### Imports ###

from math import comb, factorial
from numbers import Integral, Real
from typing import Any, Callable, Literal, Optional, Tuple, Union

import numpy as np
from scipy.ndimage import median_filter
from sklearn.utils import check_scalar

### Constants ###

_MAD_PREFACTOR = 1.482602

### Functions ###


def calc_forward_diff_kernel(
*,
Expand Down Expand Up @@ -36,7 +56,7 @@ def calc_forward_diff_kernel(
Raises
------
ValueError
If the difference order is below 1.
If ``differences`` is below 1.
"""
# the input is validated
Expand All @@ -58,6 +78,95 @@ def calc_forward_diff_kernel(
)


def calc_central_diff_kernel(*, differences: int, accuracy: int = 2) -> np.ndarray:
"""
Computes the kernel for central finite differences which can be applied to a
series by means of a convolution, e.g.,
```python
kernel = calc_central_fin_diff_kernel(differences=2, accuracy=2)
differences = np.convolve(series, np.flip(kernel), mode="valid")
# NOTE: NumPy flips the kernel internally due to the definition of convolution
```
Parameters
----------
differences : int
The order of the differences starting from 0 for the original curve, 1 for the
first order, 2 for the second order, ..., and ``m`` for the ``m``-th order
differences.
Values below 1 are not allowed.
accuracy : int, default=2
The accuracy of the finite difference approximation, which has to be an even
integer ``>= 2``.
The higher the accuracy, the better the approximation.
Returns
-------
fin_diff_kernel : ndarray of shape (kernel_size,)
A NumPy-1D-vector resembling the kernel from the code example above. Since the
elements are not necessarily integers, the data type is ``np.float64``.
Its size is given by ``2 * floor((differences + 1) / 2) - 1 + accuracy`` where
``floor`` returns the next lower integer.
Raises
------
ValueError
If ``differences`` is below 1.
ValueError
If ``accuracy`` is not an even integer ``>= 2``.
References
----------
The computation is based on the description in [1]_.
.. [1] Wikipedia, "Finite difference coefficient - Central finite difference",
URL: https://en.wikipedia.org/wiki/Finite_difference_coefficient#Central_finite_difference
""" # noqa: E501

### Input Validation ###

# first, difference order and accuracy are validated
check_scalar(
differences,
name="differences",
target_type=Integral,
min_val=1,
include_boundaries="left",
)

check_scalar(
accuracy,
name="accuracy",
target_type=Integral,
min_val=2,
include_boundaries="left",
)
if accuracy % 2 == 1:
raise ValueError("Got accuracy = {accuracy}, expected an even integer.")

### Central Difference Kernel Computation ###

# first, the size of the kernel is computed
kernel_size = 2 * ((differences + 1) // 2) - 1 + accuracy
half_kernel_size = kernel_size // 2

# then, the linear system to solve for the coefficients is set up
grid_vect = np.arange(
start=-half_kernel_size,
stop=half_kernel_size + 1,
step=1,
dtype=np.int64,
)
lhs_mat = np.vander(grid_vect, N=kernel_size, increasing=True).transpose()
rhs_vect = np.zeros(shape=(kernel_size,), dtype=np.int64)
rhs_vect[differences] = factorial(differences)

# the coefficients are computed by solving the linear system
return np.linalg.solve(lhs_mat, rhs_vect)


def _gen_squ_fw_fin_diff_mat_cho_banded_transp_first(
*,
n_data: int,
Expand Down Expand Up @@ -199,7 +308,7 @@ def gen_squ_fw_fin_diff_mat_cho_banded(
If ``n_data`` is below ``differences + 1``, i.e., the kernel does not fit into
the data at least once.
ValueError
If the difference order is below 1.
If ``differences`` is below 1.
Notes
-----
Expand Down Expand Up @@ -262,3 +371,223 @@ def gen_squ_fw_fin_diff_mat_cho_banded(
n_data=n_data,
differences=differences,
)


def estimate_noise_stddev(
series: np.ndarray,
differences: int = 6,
diff_accuracy: int = 2,
window_size: Optional[int] = None,
extrapolator: Callable[..., np.ndarray] = np.pad,
extrapolator_args: Tuple[Any, ...] = ("reflect",),
power: Literal[-2, -1, 1, 2] = 1,
stddev_min: Union[float, int] = 1e-10,
) -> np.ndarray:
"""
EXPERIMENTAL FEATURE
Estimates the local/global noise standard deviation of a series even in the presence
of trends, like baselines and peaks, as well as outliers by using forward finite
differences.
Please see the Notes section for further details.
Parameters
----------
series : ndarray of shape (n_data,)
The series for which the noise standard deviation is estimated.
differences : int, default=6
The order of the differences starting from 0 for the original curve, 1 for the
first order, 2 for the second order, ..., and ``m`` for the ``m``-th order
differences.
Empirically, 5-6 was found as a sweet spot, but even numbers work better with
the default ``extrapolator``.
Values below 1 are not allowed.
diff_accuracy : int, default=2
The accuracy of the finite difference approximation, which has to be an even
integer ``>= 2``.
window_size : int or None, default=None
The odd window size around a datapoint to estimate its local noise standard
deviation.
Higher values will lead to a smoother noise standard deviation estimate by
sacrificing the local resolution. At the same time, edge effects start blurring
in if the ``extrapolator`` does not provide a good extrapolation.
If provided, it has to be at least 1.
If ``None``, the global noise standard deviation is estimated, i.e., it will
be the same for each data point.
extrapolator : callable, default=np.pad
The extrapolator function that is used to pad the series before the finite
differences and the median filter are applied. It will pad the signal with
``pad_width = (diff_kernel_size // 2) + (window_size // 2)`` elements on each
side where ``diff_kernel_size`` is the size of the central finite differences
kernel (see the Notes for details).
It has to be a callable with the following signature:
```python
series_extrap = extrapolator(series, pad_width, *extrapolator_args)
```
If ``window_size`` is ``None``, only the central finite differences kernel is
considered.
By default, the signal is padded by reflecting ``series`` at the edges on either
side, but of course the quality of the noise estimation can be improved by using
a more sophisticated extrapolation method.
extrapolator_args : tuple, default=("reflect",)
Additional arguments that are passed to the extrapolator function as described
for ``extrapolator``.
power : {-2, -1, 1, 2}, default=1
The power to which the noise standard deviation is raised.
This can be used to compute the:
- original noise standard deviation (``power=1``),
- the noise variance (``power=2``),
- the inverse noise standard deviation (``power=-1``), or
- the inverse noise variance (``power=-2``; typically used as weights).
stddev_min : float or int, default=1e-10
The minimum noise standard deviation that is allowed.
Any estimated noise standard deviation below this value will be set to this
value.
It must be at least ``1e-15``.
Returns
-------
noise_stddev : ndarray of shape (n_data,)
The estimated noise standard deviation raised to ``power`` for each data point
in the series.
Raises
------
ValueError
If ``series.size`` is below less than the kernel or window size (see Notes for
details).
ValueError
If ``differences`` is below 1.
ValueError
If ``diff_accuracy`` is not an even integer ``>= 2``.
ValueError
If ``window_size`` is below 1.
References
----------
The estimation algorithm is an adaption of the global estimation logic applied for
the "DER SNR" proposed in [1]_ (see the Notes for further details).
.. [1] Stoehr F., et al., "DER SNR: A Simple & General Spectroscopic Signal-to-Noise
Measurement Algorithm", Astronomical Data Analysis Software and Systems XVII P5.4
ASP Conference Series, Vol. XXX, 2008
Notes
-----
The "DER SNR" algorithm estimates a global noise level in a robust fashion by
applying a modified version of the Median Absolute Deviation (MAD) to the
derivative/differences of the signal. By using a moving MAD filter, the local noise
level can be estimated as well.
The algorithms does not work well for signals that are perfectly noise-free.
The kernel size for the central finite difference kernel is given by
``2 * floor((differences + 1) / 2) - 1 + diff_accuracy``.
"""

### Input Validation ###

# first, the window size, power, and minimum standard deviation are validated
# NOTE: the difference order and accuracy are by the central finite differences
# kernel function
# window size
if window_size is not None:
check_scalar(
window_size,
name="window_size",
target_type=Integral,
min_val=1,
include_boundaries="left",
)
if window_size % 2 == 0:
raise ValueError(
"Got window_size = {window_size}, expected an odd integer."
)

# power
if power not in {-2, -1, 1, 2}:
raise ValueError(f"Got power = {power}, expected -2, -1, 1, or 2.")

# minimum standard deviation
check_scalar(
stddev_min,
name="stddev_min",
target_type=Real,
min_val=1e-15,
include_boundaries="left",
)

# for validation of the series, the central finite differences kernel has to be
# computed
diff_kernel = calc_central_diff_kernel(
differences=differences,
accuracy=diff_accuracy,
)

# afterwards, the series is validated
if series.size < diff_kernel.size:
raise ValueError(
f"Got series.size = {series.size}, must be >= {diff_kernel.size} (kernel "
f"size)."
)

if window_size is not None:
if series.size < window_size:
raise ValueError(
f"Got series.size = {series.size}, must be >= {window_size} (window "
"size)."
)

### Noise Standard Deviation Estimation ###

# the signal is extrapolated to avoid edge effects
pad_width = diff_kernel.size // 2
pad_width += 0 if window_size is None else window_size // 2
series_extrap = extrapolator(
series,
pad_width,
*extrapolator_args,
)

# the absolute forward finite differences are computed ...
abs_diff_series = np.abs(
np.convolve(series_extrap, np.flip(diff_kernel), mode="valid")
)
size_after_diff = abs_diff_series.size

# ... and the median filter is applied to theses differences
prefactor = _MAD_PREFACTOR / np.linalg.norm(diff_kernel)
# Case 1: the global noise standard deviation is estimated
if window_size is None:
noise_stddev = np.full_like(
series,
fill_value=prefactor * np.median(abs_diff_series),
)

# Case 2: the local noise standard deviation is estimated
else:
half_window_size = window_size // 2
noise_stddev = (
prefactor
* median_filter(
abs_diff_series,
size=window_size,
mode="constant",
)[half_window_size : size_after_diff - half_window_size]
)

# the minimum-bounded noise standard deviation is raised to the power
noise_stddev = np.maximum(noise_stddev, stddev_min)

if power in {-2, 2}:
noise_stddev = np.square(noise_stddev)

if power in {-2, -1}:
noise_stddev = np.reciprocal(noise_stddev)

return noise_stddev
Loading

0 comments on commit dd4cc7a

Please sign in to comment.