Skip to content

Commit

Permalink
kuramoto models and standardized model tests
Browse files Browse the repository at this point in the history
  • Loading branch information
djpasseyjr committed Jan 31, 2024
1 parent 6057ae8 commit 78f165e
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 82 deletions.
1 change: 1 addition & 0 deletions interfere/dynamics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
StandardTNoise
)
from .geometric_brownian_motion import GeometricBrownianMotion
from .kuramoto import Kuramoto, KuramotoSakaguchi
from .lotka_voltera import LotkaVoltera, LotkaVolteraSDE
from .ornstein_uhlenbeck import OrnsteinUhlenbeck
from .quadratic_sdes import Belozyorov3DQuad, Liping3DQuadFinance
Expand Down
10 changes: 8 additions & 2 deletions interfere/dynamics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def simulate(

# Optionally add measurement noise
if self.measurement_noise_std is not None:
X_do = self.add_measurement_noise(X_do, rng)
# Don't add measurement noise to initial condition
X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng)

return X_do

Expand Down Expand Up @@ -169,7 +170,8 @@ def simulate(
X_do[i + 1, :] = next_x

if self.measurement_noise_std is not None:
X_do = self.add_measurement_noise(X_do, rng)
# Don't add measurement noise to initial condition
X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng)

return X_do

Expand Down Expand Up @@ -287,6 +289,10 @@ def simulate(
if intervention is not None:
X_do[-1] = intervention(X_do[-1], time_points[-1])

if self.measurement_noise_std is not None:
# Don't add measurement noise to initial condition
X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng)

return X_do

@abstractmethod
Expand Down
196 changes: 196 additions & 0 deletions interfere/dynamics/kuramoto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""Contains several variants of the Kuramoto model.
1. Standard Kuramoto
2. Kuramoto-Sakaguchi
3. Stuart-Landau Kuramoto
See S2.3.2 of Cliff et. al. 2023 "Unifying Pairwise..."
For the kuramoto model Cliff et. al. used three coupling schemes (1) all to all
(2) bidirectional list (3) grid four, where each oscilator is connected to four
neighbors.
"""
from typing import Optional, Callable

import numpy as np

from .base import StochasticDifferentialEquation, DEFAULT_RANGE
from ..utils import copy_doc


def kuramoto_intervention_wrapper(
intervention: Callable[[np.ndarray, float], np.ndarray]
) -> Callable[[np.ndarray, float], np.ndarray]:
"""Wraps the intervention in arcsin.
This is done so that the final simulation has the correct intervention
values.
Note: the range of the intervention must be [-1, 1].
Returns:
kuramoto_intervention (callable): arcsin(intervention(x, t)).
"""

def kuramoto_intervention(x: np.array, t: float):
"""Wraps intervention in arcsin(x)"""
x_do = intervention(x, t)
altered = x_do != x
if np.any(np.abs(x_do[altered])) > 1:
raise ValueError("For the kuramoto models, the range of "
" interventions must fall within [-1, 1]")

x_do[altered] = np.arcsin(x_do[altered])
return x_do


return kuramoto_intervention


class Kuramoto(StochasticDifferentialEquation):


def __init__(
self,
omega: np.ndarray,
K: float,
adjacency_matrix: np.ndarray,
sigma: float = 0,
measurement_noise_std: Optional[np.ndarray] = None
):
"""Initializes a Kuramoto SDE with independent noise.
dtheta_i = (omega_i + (K/M) sum_j a_{ij} sin(theta_j - theta_i)) dt +
sigma dW_i
where M is the number of nodes in the network.
The model returns sin(theta) to avoid discontinuities in the phase.
Similarly, the intervention is operates on the phase, but sin(x) is
applied to every state after the simulation is finished.
Args:
omega (np.ndarray): The natural frequency of each oscilator.
K (float): The coupling constant.
adjacency_matrix (np.ndarray): A matrix containing the connectivity.
sigma (float): Parameter controlling the standard deiation of
system noise.
measurement_noise_std (ndarray): None, or a vector with shape (n,)
where each entry corresponds to the standard deviation of the
measurement noise for that particular dimension of the dynamic
model. For example, if the dynamic model had two variables x1
and x2 and measurement_noise_std = [1, 10], then independent
gaussian noise with standard deviation 1 and 10 will be added to
x1 and x2 respectively at each point in time.
"""
dim = adjacency_matrix.shape[0]
self.omega = omega
self.K = K
self.adjacency_matrix = adjacency_matrix
self.Sigma = sigma * np.diag(np.ones(dim))
super().__init__(dim, measurement_noise_std)


@copy_doc(StochasticDifferentialEquation.simulate)
def simulate(
self,
initial_condition: np.ndarray,
time_points: np.ndarray,
intervention: Optional[Callable[[np.ndarray, float], np.ndarray]]= None,
rng: np.random.mtrand.RandomState = DEFAULT_RANGE,
dW: Optional[np.ndarray] = None,
) -> np.ndarray:
# Check initial condition.
if np.any(np.abs(initial_condition) > 1):
raise ValueError("Kuramoto Models require initial conditions in "
"the interval (-1, 1).")
# Extract phase of the initial condition.
theta0 = np.arcsin(initial_condition)

# Wrap the intervention in arcsin. Its range must be [-1, 1].
if intervention is not None:
intervention=kuramoto_intervention_wrapper(intervention)

# Turn off measurment noise in order to add it after the transformation.
measurement_noise_std = self.measurement_noise_std
self.measurement_noise_std = None

X_do = super().simulate(
theta0,
time_points,
intervention=intervention,
rng=rng,
dW=dW
)
# Return sin of the phase. (This undoes the arcsin transformations.)
X_do = np.sin(X_do)

self.measurement_noise_std = measurement_noise_std
if measurement_noise_std is not None:
# Don't add noise to initial condition.
X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng)

return X_do


def drift(self, theta: np.ndarray, t: float):
one = np.ones(self.dim)
prefactor = self.K / self.dim
theta_j = np.outer(one, theta)
theta_i = np.outer(theta, one)

return self.omega + prefactor * (
self.adjacency_matrix * np.sin(theta_j - theta_i)).dot(one)

def noise(self, theta: np.ndarray, t):
return self.Sigma


class KuramotoSakaguchi(Kuramoto):

def __init__(
self,
omega: np.ndarray,
K: float,
adjacency_matrix: np.ndarray,
phase_frustration: np.ndarray,
sigma: float = 0,
measurement_noise_std: Optional[np.ndarray] = None
):
"""Initializes a Kuramoto-Sakaguchi SDE with independent noise.
dtheta_i =
(omega_i + (K/M) sum_j a_{ij} sin(theta_j - theta_i - A_{ij)}) dt
+ sigma dW_i
where M is the number of nodes in the network and Z is the phase
frustration matrix.
Args:
omega (np.ndarray): The natural frequency of each oscilator.
K (float): The coupling constant.
adjacency_matrix (np.ndarray): A matrix containing the connectivity.
phase_frustration (np.ndarray):
sigma (float): Parameter controlling the standard deiation of
system noise.
measurement_noise_std (ndarray): None, or a vector with shape (n,)
where each entry corresponds to the standard deviation of the
measurement noise for that particular dimension of the dynamic
model. For example, if the dynamic model had two variables x1
and x2 and measurement_noise_std = [1, 10], then independent
gaussian noise with standard deviation 1 and 10 will be added to
x1 and x2 respectively at each point in time.
"""
self.phase_frustration = phase_frustration
super().__init__(
omega, K, adjacency_matrix, sigma, measurement_noise_std)

def drift(self, theta: np.ndarray, t: float):
one = np.ones(self.dim)
prefactor = self.K / self.dim
theta_j = np.outer(one, theta)
theta_i = np.outer(theta, one)

return self.omega + prefactor * (
self.adjacency_matrix * np.sin(
theta_j - theta_i - self.phase_frustration)).dot(one)
7 changes: 6 additions & 1 deletion interfere/dynamics/statespace_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,15 @@ def simulate(

# Optional intervention
if intervention is not None:
x_next = intervention(x_next)
x_next = intervention(x_next, time_points[i])

X[p+i, :] = x_next

if self.measurement_noise_std is not None:
X = self.add_measurement_noise(X, rng)
# Preserve initial conditions.
X[:p, :] = initial_condition

return X


Loading

0 comments on commit 78f165e

Please sign in to comment.