Skip to content

Commit

Permalink
stuart-landau-kuramoto oscilator
Browse files Browse the repository at this point in the history
  • Loading branch information
djpasseyjr committed Feb 2, 2024
1 parent 78f165e commit 6ebc5fe
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 3 deletions.
2 changes: 1 addition & 1 deletion interfere/dynamics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
StandardTNoise
)
from .geometric_brownian_motion import GeometricBrownianMotion
from .kuramoto import Kuramoto, KuramotoSakaguchi
from .kuramoto import Kuramoto, KuramotoSakaguchi, StuartLandauKuramoto
from .lotka_voltera import LotkaVoltera, LotkaVolteraSDE
from .ornstein_uhlenbeck import OrnsteinUhlenbeck
from .quadratic_sdes import Belozyorov3DQuad, Liping3DQuadFinance
Expand Down
2 changes: 1 addition & 1 deletion interfere/dynamics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def simulate(
the n rows correspond to the n times in `time_points`.
"""
m = len(time_points)
X_do = np.zeros((m, self.dim))
X_do = np.zeros((m, self.dim), dtype=initial_condition.dtype)

# Optionally apply intervention to initial condition
if intervention is not None:
Expand Down
139 changes: 139 additions & 0 deletions interfere/dynamics/kuramoto.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,23 @@
from typing import Optional, Callable

import numpy as np
from pyclustering.nnet import conn_type
from pyclustering.nnet.fsync import fsync_network

from .base import StochasticDifferentialEquation, DEFAULT_RANGE
from ..utils import copy_doc


# Maps string arguments to pyclustering arguments
CONN_TYPE_MAP = {
"all_to_all": conn_type.ALL_TO_ALL,
"grid_four": conn_type.GRID_FOUR,
"grid_eight": conn_type.GRID_EIGHT,
"list_bdir": conn_type.LIST_BIDIR,
"dynamic": conn_type.DYNAMIC
}


def kuramoto_intervention_wrapper(
intervention: Callable[[np.ndarray, float], np.ndarray]
) -> Callable[[np.ndarray, float], np.ndarray]:
Expand Down Expand Up @@ -194,3 +206,130 @@ def drift(self, theta: np.ndarray, t: float):
return self.omega + prefactor * (
self.adjacency_matrix * np.sin(
theta_j - theta_i - self.phase_frustration)).dot(one)


class StuartLandauKuramoto(StochasticDifferentialEquation):

def __init__(
self,
omega: np.ndarray,
rho: np.ndarray,
K: float,
sigma: float = 0,
type_conn = "all_to_all",
convert_to_real = True,
measurement_noise_std: Optional[np.ndarray] = None
):
"""
Model of an oscillatory network that uses Landau-Stuart oscillator and Kuramoto model as a synchronization mechanism.
The dynamics of each oscillator in the network is described by following differential Landau-Stuart equation with feedback:
dz_j/dt = (i omega_j + rho_j^2 - |z_j|^2) z_j # Stuart-landau part
+ (K/N) sum_{k=0}^N A_jk (z_k - z_j) # Kuramoto part
where i is the complex number, omega_j is the natural frequency, rho_j
is the radius.
Args:
omega (np.ndarray): 1D array of natural frequencies
rho (np.ndarray): Radius of oscillators that affects amplitude. 1D
array with the same length as omega
K (float): Coupling strength between oscillators.
sigma (float): Scale of the brownian increments. Model is
deterministic when sigma == 0.
type_conn (str): Type of connection between oscillators. One
of ["all_to_all", "grid_four", "grid_eight", "list_bdir",
"dynamic"]. See pyclustering.nnet.__init__::conn_type for
details.
convert_to_real (bool): If true, self.simulate returns only the
real part or the time series.
measurement_noise_std (ndarray): None, or a vector with shape (n,)
where each entry corresponds to the standard deviation of the
measurement noise for that particular dimension of the dynamic
model. For example, if the dynamic model had two variables x1
and x2 and measurement_noise_std = [1, 10], then independent
gaussian noise with standard deviation 1 and 10 will be added to
x1 and x2 respectively at each point in time.
"""
dim = len(omega)
if len(rho) != dim:
raise ValueError("omega and rho arguments must have the same size.")

self.omega = omega
self.rho = rho
self.K = K
self.Sigma = sigma * np.diag(np.ones(dim))
self.type_conn = type_conn
self.convert_to_real = convert_to_real

# Initialize the pyclustering model.
self.pyclustering_model = fsync_network(
dim, omega, rho, K, CONN_TYPE_MAP[type_conn])

super().__init__(dim, measurement_noise_std)

@copy_doc(StochasticDifferentialEquation.simulate)
def simulate(
self,
initial_condition: np.ndarray,
time_points: np.ndarray,
intervention: Optional[Callable[[np.ndarray, float], np.ndarray]]= None,
rng: np.random.mtrand.RandomState = DEFAULT_RANGE,
dW: Optional[np.ndarray] = None,
) -> np.ndarray:
# Must have a complex initial condition.
z0 = np.array(initial_condition, dtype=np.complex128)
Z_do = super().simulate(
z0,
time_points,
intervention=intervention,
rng=rng,
dW=dW
)
if self.convert_to_real:
Z_do = np.real(Z_do)
return Z_do

def drift(self, z: np.ndarray, t: float):
"""Deterministic part of Stuart-Landau-Kuramoto dynamics.
The pyclustering.nnet.fsync model uses an internal amplitude attribute
(which is the observed node states) to compute the kuramoto
synchronization. This internal amplitude is only
updated on observed timesteps, however, the ode solver is used at a
small scale to perform updates BETWEEN timesteps with neighbor amplitude
held constant and equal to the stored amplitude.
We overwrite fsync_network.__amplitude here to compute instinataneous
dynamics without the update delay that is built into the pyclustering
model.
Args:
z (complex np.ndarray): 1d array of current state. Complex numbers.
t (float): Current time.
"""
z_column = z.reshape(-1, 1)
# In pyclustering.nnet.fysnc.fsync_dynamic.simulate
self.pyclustering_model._fsync_dynamic__amplitude = z_column

# The function _fsync_network__calculate_amplitude accepts and returns
# 2 float64s to represent the complex numbers. We convert
# the imaginary numbers to 2 floats before passing to the function.
z_2d_float = z_column.view(np.float64)

# We call the function on each node. Then we stack the 2D outputs into
# A (self.dim x 2) array.
dz_2d = np.vstack([
self.pyclustering_model._fsync_network__calculate_amplitude(
z_2d_float[node_index, :], t, node_index)
for node_index in range(self.dim)
])

# Last we convert the real representation to a 1D imaginary array
dz = dz_2d.view(np.complex128)[:, 0]
return dz

def noise(self, x: np.ndarray, t: float):
"""Independent noise matrix scaled by scalar sigma in self.__init__."""
return self.Sigma
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ authors = [
dependencies = [
"numpy",
"pandas",
"pyclustering@git+https://github.com/djpasseyjr/pyclustering",
"pysindy",
"scikit-learn",
"sktime",
Expand Down
4 changes: 3 additions & 1 deletion tests/test_dynamics.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,12 @@ def test_kuramoto():
K = 0.7
A = rng.random((10, 10)) < .3
sigma=0.1
error_std = np.ones(10)
rho = np.random.rand(10)

# Run standard checks for interfere.base.DynamicModel objects.
model = interfere.dynamics.Kuramoto(omega, K, A, sigma)
check_simulate_method(model)
model = interfere.dynamics.KuramotoSakaguchi(omega, K, A, A, sigma)
check_simulate_method(model)
model = interfere.dynamics.StuartLandauKuramoto(omega, rho, K, sigma)
check_simulate_method(model)

0 comments on commit 6ebc5fe

Please sign in to comment.