Skip to content

Commit

Permalink
Merge pull request #14 from LRydin/dev
Browse files Browse the repository at this point in the history
Various simplifications and improvements.
 - `km` zero is now normalised
 - `bins` can be given as an `int`, a `list` of `int`s, e.g., for 2-D, `[50, 50]`. Arrays can be specified for each dimension, i.e., `[np.array, np.array]`
 - `powers` has been simplified, such that simply an `int` with the maximum power can be given, e.g., `power=8`.
  • Loading branch information
LRydin authored Aug 25, 2023
2 parents 4d18423 + a0ff260 commit 57a64ee
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 122 deletions.
45 changes: 19 additions & 26 deletions kramersmoyal/kernels.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def kernel(kernel_func):
"""
@wraps(kernel_func) # just for naming
def decorated(x, bw):
def volume_unit_ball(dims: int):
# volume of a unit ball in dimension dims
return np.pi ** (dims / 2.0) / gamma(dims / 2.0 + 1.0)

if len(x.shape) == 1:
x = x.reshape(-1, 1)

Expand All @@ -24,27 +28,21 @@ def decorated(x, bw):
# Euclidean norm
dist = np.sqrt((x * x).sum(axis=-1))

return kernel_func(dist / bw, dims) / (bw ** dims)
return kernel_func(dist / bw, dims) / (bw ** dims) / volume_unit_ball(dims)
return decorated


def volume_unit_ball(dims: int) -> float:
"""
Returns the volume of a unit ball in dimensions dims.
"""
return np.pi ** (dims / 2.0) / gamma(dims / 2.0 + 1.0)

@kernel
def epanechnikov(x: np.ndarray, dims: int) -> np.ndarray:
"""
The Epanechnikov kernel in dimensions dims.
"""
normalisation = 2.0 / (dims + 2.0)
x2 = (x ** 2)
mask = x2 < 1.0
kernel = np.zeros_like(x)
kernel[mask] = (1.0 - x2[mask])
normalisation = 2.0 / (dims + 2.0) * volume_unit_ball(dims)
return kernel / normalisation
kernel[mask] = (1.0 - x2[mask]) / normalisation
return kernel

@kernel
def gaussian(x: np.ndarray, dims: int) -> np.ndarray:
Expand All @@ -56,9 +54,9 @@ def gaussian_integral(n):
return np.sqrt(np.pi * 2) * factorial2(n - 1) / 2
elif n % 2 == 1:
return np.sqrt(np.pi * 2) * factorial2(n - 1) * norm.pdf(0)
kernel = np.exp(-x ** 2 / 2.0)
normalisation = dims * gaussian_integral(dims - 1) * volume_unit_ball(dims)
return kernel / normalisation
normalisation = dims * gaussian_integral(dims - 1)
kernel = np.exp(-x ** 2 / 2.0) / normalisation
return kernel

@kernel
def uniform(x: np.ndarray, dims: int) -> np.ndarray:
Expand All @@ -68,40 +66,35 @@ def uniform(x: np.ndarray, dims: int) -> np.ndarray:
mask = x < 1.0
kernel = np.zeros_like(x)
kernel[mask] = 1.0
normalisation = volume_unit_ball(dims)
return kernel / normalisation
return kernel

@kernel
def triagular(x: np.ndarray, dims: int) -> np.ndarray:
"""
Triagular kernel in dimensions dims.
"""
normalisation = 1.0 / 2.0
mask = x < 1.0
kernel = np.zeros_like(x)
kernel[mask] = 1.0 - np.abs(x[mask])
normalisation = volume_unit_ball(dims) / 2.0
return kernel / normalisation
kernel[mask] = (1.0 - np.abs(x[mask])) / normalisation
return kernel


@kernel
def quartic(x: np.ndarray, dims: int) -> np.ndarray:
"""
Quartic, or biweight kernel in dimensions dims.
"""
normalisation = 2.0 / (dims + 2.0)
x2 = (x ** 2)
mask = x2 < 1.0
kernel = np.zeros_like(x)
kernel[mask] = ((1.0 - x2[mask]) ** 2)
normalisation = 2.0 / (dims + 2.0) * volume_unit_ball(dims)
return kernel / normalisation


_kernels = {epanechnikov, gaussian, uniform, triagular, quartic}
kernel[mask] = ((1.0 - x2[mask]) ** 2) / normalisation
return kernel


def silvermans_rule(timeseries: np.ndarray) -> float:
n, dims = timeseries.shape
sigma = np.std(timeseries, axis=0)
sigma = sigma.max()

return ( (4.0 * sigma**5) / (3 * n)) ** (1 / 5)
return ((4.0 * sigma ** 5) / (3 * n)) ** (1 / 5)
187 changes: 119 additions & 68 deletions kramersmoyal/kmc.py
Original file line number Diff line number Diff line change
@@ -1,115 +1,177 @@
import numpy as np
from scipy.signal import convolve
from scipy.special import factorial
from itertools import product

from .binning import histogramdd
from .kernels import silvermans_rule, epanechnikov, _kernels
from .kernels import silvermans_rule, epanechnikov

def km(timeseries: np.ndarray, bins: np.ndarray, powers: np.ndarray,
kernel: callable=None, bw: float=None, tol: float=1e-10,
conv_method: str='auto') -> np.ndarray:
def km(timeseries: np.ndarray, bins: str='default', powers: int=4,
kernel: callable=epanechnikov, bw: float=None, tol: float=1e-10,
conv_method: str='auto', center_edges: bool=True) -> np.ndarray:
"""
Estimates the Kramers─Moyal coefficients from a timeseries using a kernel
estimator method. ``km`` can calculate the Kramers─Moyal coefficients for a
estimator method. `km` can calculate the Kramers─Moyal coefficients for a
timeseries of any dimension, up to any desired power.
Parameters
----------
timeseries: np.ndarray
The D-dimensional timeseries ``(N, D)``. The timeseries of length ``N``
and dimensions ``D``.
bins: np.ndarray
The number of bins for each dimension. This is the underlying space for
the Kramers-Moyal coefficients. In 1-dimension a choice as
``bins = np.array([6000])``
is recommended. In 2-dimensions
``bins = np.array([300,300])``
is recommended.
powers: np.ndarray
Powers for the operation of calculating the Kramers─Moyal coefficients,
which need to match dimensions of the timeseries. In 1-dimension the
first four Kramers-Moyal coefficients can be found via
``powers = np.array([0],[1],[2],[3],[4])``.
In 2 dimensions take into account each dimension, as
::
powers = np.array([0,0],[0,1],[1,0],[1,1],[0,2],[2,0],[2,2],
[0,3],[3,0],[3,3],[0,4],[4,0],[4,4])
kernel: callable (default ``None``)
Kernel used to convolute with the Kramers-Moyal coefficients. To select
for example an Epanechnikov kernel use
The D-dimensional timeseries `(N, D)`. The timeseries of length `N`
and dimensions `D`.
bins: int or list or np.ndarray or string (default `default`)
The number of bins. This is the underlying space for the Kramers─Moyal
coefficients to be estimated. If desired, bins along each dimension can
be given as monotonically increasing bin edges (tuple or list), e.g.,
* in 1-D, `(np.linspace(lower, upper, length),)`;
* in 2-D, `(np.linspace(lower_x, upper_x, length_x),
np.linspace(lower_y, upper_y, length_y))`,
with desired `lower` and `upper` ranges (in each dimension).
If default, the bin numbers for different dimensions are:
* 1-D, 5000;
* 2-D, 100×100;
* 3-D, 25×25×25.
The bumber of bins along each dimension can be specified, e.g.,
* 2-D, `[125, 75]`,
* 3-D, `[100, 80, 120]`.
``kernel = kernels.epanechnikov``
If `bins` is int, or a list or np.array of dimension 1, and the
`timeseries` dimension is `D`, then `int(bins**(1/D))`.
If ``None`` the Epanechnikov kernel will be used.
powers: int or list or tuple or np.ndarray (default `4`)
Powers for the operation of calculating the Kramers─Moyal coefficients.
Default is the largest power used, e.g., if `4`, then `(0, 1, 2, 3, 4)`.
They can be specified, matching the dimensions of the timeseries. E.g.,
in 1-dimension the first four Kramers─Moyal coefficients can be given as
`powers=(0, 1, 2, 3, 4)`, which is the same as `powers=4`. Setting
`powers=p` for higher dimensions will results in all possible
combinations up to the desired power 'p', e.g.
bw: float (default ``None``)
* 2-D, `powers=2` results in
powers = np.array([[0, 0, 1, 1, 0, 1, 2, 2, 2],
[0, 1, 0, 1, 2, 2, 0, 1, 2]]).T
Set `verbose=True` to print out `powers`. The order that they appear
dictactes the order in the output `kmc`.
kernel: callable (default `epanechnikov`)
Kernel used to convolute with the Kramers-Moyal coefficients. To select
for example a Gaussian kernel use
`kernel = kernels.gaussian`
bw: float (default `None`)
Desired bandwidth of the kernel. A value of 1 occupies the full space of
the bin space. Recommended are values ``0.005 < bw < 0.5``.
the bin space. Recommended are values `0.005 < bw < 0.5`.
tol: float (default ``1e-10``)
Round to zero absolute values smaller than ``tol``, after the
tol: float (default `1e-10`)
Round to zero absolute values smaller than `tol`, after the
convolutions.
conv_method: str (default ``auto``)
conv_method: str (default `auto`)
A string indicating which method to use to calculate the convolution.
https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.convolve.html
center_edges: bool (default `True`)
Whether to return the bin centers or the bin edges (since for `n` bins
there are `n + 1` edges).
Returns
-------
kmc: np.ndarray
The calculated Kramers-Moyal coefficients in accordance to the
timeseries dimensions in (D,bins.shape) shape. To extract the selected
orders of the kmc, use kmc[i,...], with i the order according to powers
The calculated Kramers─Moyal coefficients in accordance to the
timeseries dimensions in `(D, bins.shape)` shape. To extract the
selected orders of the kmc, use `kmc[i,...]`, with `i` the order
according to powers.
edges: np.ndarray
The bin edges with shape (D,bins.shape) of the calculated Kramers-Moyal
coefficients
The bin edges with shape `(D, bins.shape)` of the estimated
Kramers─Moyal coefficients.
References
----------
.. [Lamouroux2009] D. Lamouroux and K. Lehnertz, "Kernel-based regression of
drift and diffusion coefficients of stochastic processes." Physics Letters A
373(39), 3507─3512, 2009.
"""

# Check finiteness, dimensions, and existence of the time series
timeseries = np.asarray_chkfinite(timeseries, dtype=float)
if len(timeseries.shape) == 1:
timeseries = timeseries.reshape(-1, 1)

assert len(timeseries.shape) == 2, "Timeseries must (n, dims) shape"
# safety check, if data not in vertical (N, dims)
assert timeseries.shape[1] < timeseries.shape[0], \
"Timeseries seems to be (D, N) shaped, transpose it: Timeseries.T"

assert len(timeseries.shape) == 2, "Timeseries must be (N, D) shape"
assert timeseries.shape[0] > 0, "No data in timeseries"

n, dims = timeseries.shape

# Tranforming powers into right shape
if isinstance(powers, int):
# complicated way of obtaing power in all dimensions
powers = np.array(sorted(product(*(range(powers + 1),) * dims),
key=lambda x: (max(x), x)))

powers = np.asarray_chkfinite(powers, dtype=float)
if len(powers.shape) == 1:
powers = powers.reshape(-1, 1)

trim_output = False
if not (powers[0] == [0] * dims).all():
powers = np.array([[0] * dims, *powers])
trim_output = True
else:
trim_output = False

assert (powers[0] == [0] * dims).all(), "First power must be zero"
assert dims == powers.shape[1], "Powers not matching timeseries' dimension"
assert dims == bins.shape[0], "Bins not matching timeseries' dimension"

# Check and adjust bins
if isinstance(bins, str):
if bins == 'default':
bins = [5000] if dims == 1 else bins
bins = [100] * 2 if dims == 2 else bins
bins = [25] * 3 if dims == 3 else bins
assert dims < 4, "If dimension of timeseries > 3, set bins manually"

if isinstance(bins, int):
bins = [int(bins**(1/dims))] * dims

if isinstance(bins, (list, tuple)):
assert all(isinstance(ele, (int, np.ndarray)) for ele in bins), \
"list or tuples of bins must either be ints or arrays"

# bins = np.asarray_chkfinite(bins, dtype=int)
assert dims == len(bins), "Bins not matching timeseries' dimension"

if bw is None:
bw = silvermans_rule(timeseries)
elif callable(bw):
bw = bw(timeseries)
assert bw > 0.0, "Bandwidth must be > 0"

if kernel is None:
kernel = epanechnikov
assert kernel in _kernels, "Kernel not found"

# This is where the calculations take place
kmc, edges = _km(timeseries, bins, powers, kernel, bw, tol, conv_method)

if center_edges:
edges = [edge[:-1] + 0.5 * (edge[1] - edge[0]) for edge in edges]

return (kmc, edges) if not trim_output else (kmc[1:], edges)


def _km(timeseries: np.ndarray, bins: np.ndarray, powers: np.ndarray,
kernel: callable, bw: float, tol: float, conv_method: str) -> np.ndarray:
kernel: callable, bw: float, tol: float,
conv_method: str) -> np.ndarray:
"""
Helper function for km that does the heavy lifting and actually estimates
Helper function for `km` that does the heavy lifting and actually estimates
the Kramers─Moyal coefficients from the timeseries.
"""
def cartesian_product(arrays: np.ndarray):
Expand All @@ -118,16 +180,7 @@ def cartesian_product(arrays: np.ndarray):
arr = np.empty([len(a) for a in arrays] + [la], dtype=np.float64)
for i, a in enumerate(np.ix_(*arrays)):
arr[..., i] = a
return arr.reshape(-1, la)

def kernel_edges(edges: np.ndarray):
# Generates the kernel edges
edges_k = list()
for edge in edges:
dx = edge[1] - edge[0]
L = edge.size
edges_k.append(np.linspace(-dx * L, dx * L, int(2 * L + 1)))
return edges_k
return arr

# Calculate derivative and the product of its powers
grads = np.diff(timeseries, axis=0)
Expand All @@ -137,11 +190,9 @@ def kernel_edges(edges: np.ndarray):
hist, edges = histogramdd(timeseries[:-1, ...], bins=bins,
weights=weights, bw=bw)

# Generate centred kernel
edges_k = kernel_edges(edges)
mesh = cartesian_product(edges_k)
kernel_ = kernel(mesh, bw=bw).reshape(*(edge.size for edge in edges_k))
kernel_ /= np.sum(kernel_)
# Generate centred kernel on larger grid (fft'ed convolutions are circular)
edges_k = [(e[1] - e[0]) * np.arange(-e.size, e.size + 1) for e in edges]
kernel_ = kernel(cartesian_product(edges_k), bw=bw)

# Convolve weighted histogram with kernel and trim it
kmc = convolve(hist, kernel_[None, ...], mode='same', method=conv_method)
Expand All @@ -152,4 +203,4 @@ def kernel_edges(edges: np.ndarray):
taylors = np.prod(factorial(powers[1:]), axis=1)
kmc[1:, ~mask] /= taylors[..., None] * kmc[0, ~mask]

return kmc, [edge[:-1] + 0.5 * (edge[1] - edge[0]) for edge in edges]
return kmc, edges
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="kramersmoyal",
version="0.4",
version="0.4.1",
author="Leonardo Rydin Gorjao and Francisco Meirinhos",
author_email="leonardo.rydin@gmail.com",
description="Calculate Kramers-Moyal coefficients for stochastic process of any dimension, up to any order.",
Expand Down
Loading

0 comments on commit 57a64ee

Please sign in to comment.