diff --git a/interfere/dynamics/__init__.py b/interfere/dynamics/__init__.py index b3ee94c..48be3ac 100644 --- a/interfere/dynamics/__init__.py +++ b/interfere/dynamics/__init__.py @@ -21,6 +21,7 @@ StandardTNoise ) from .geometric_brownian_motion import GeometricBrownianMotion +from .kuramoto import Kuramoto, KuramotoSakaguchi from .lotka_voltera import LotkaVoltera, LotkaVolteraSDE from .ornstein_uhlenbeck import OrnsteinUhlenbeck from .quadratic_sdes import Belozyorov3DQuad, Liping3DQuadFinance diff --git a/interfere/dynamics/base.py b/interfere/dynamics/base.py index 08dd918..1e236ce 100644 --- a/interfere/dynamics/base.py +++ b/interfere/dynamics/base.py @@ -65,7 +65,8 @@ def simulate( # Optionally add measurement noise if self.measurement_noise_std is not None: - X_do = self.add_measurement_noise(X_do, rng) + # Don't add measurement noise to initial condition + X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng) return X_do @@ -169,7 +170,8 @@ def simulate( X_do[i + 1, :] = next_x if self.measurement_noise_std is not None: - X_do = self.add_measurement_noise(X_do, rng) + # Don't add measurement noise to initial condition + X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng) return X_do @@ -287,6 +289,10 @@ def simulate( if intervention is not None: X_do[-1] = intervention(X_do[-1], time_points[-1]) + if self.measurement_noise_std is not None: + # Don't add measurement noise to initial condition + X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng) + return X_do @abstractmethod diff --git a/interfere/dynamics/kuramoto.py b/interfere/dynamics/kuramoto.py new file mode 100644 index 0000000..0eb8e8f --- /dev/null +++ b/interfere/dynamics/kuramoto.py @@ -0,0 +1,196 @@ +"""Contains several variants of the Kuramoto model. + +1. Standard Kuramoto +2. Kuramoto-Sakaguchi +3. Stuart-Landau Kuramoto + +See S2.3.2 of Cliff et. al. 2023 "Unifying Pairwise..." + +For the kuramoto model Cliff et. al. used three coupling schemes (1) all to all +(2) bidirectional list (3) grid four, where each oscilator is connected to four +neighbors. +""" +from typing import Optional, Callable + +import numpy as np + +from .base import StochasticDifferentialEquation, DEFAULT_RANGE +from ..utils import copy_doc + + +def kuramoto_intervention_wrapper( + intervention: Callable[[np.ndarray, float], np.ndarray] + ) -> Callable[[np.ndarray, float], np.ndarray]: + """Wraps the intervention in arcsin. + + This is done so that the final simulation has the correct intervention + values. + + Note: the range of the intervention must be [-1, 1]. + + Returns: + kuramoto_intervention (callable): arcsin(intervention(x, t)). + """ + + def kuramoto_intervention(x: np.array, t: float): + """Wraps intervention in arcsin(x)""" + x_do = intervention(x, t) + altered = x_do != x + if np.any(np.abs(x_do[altered])) > 1: + raise ValueError("For the kuramoto models, the range of " + " interventions must fall within [-1, 1]") + + x_do[altered] = np.arcsin(x_do[altered]) + return x_do + + + return kuramoto_intervention + + +class Kuramoto(StochasticDifferentialEquation): + + + def __init__( + self, + omega: np.ndarray, + K: float, + adjacency_matrix: np.ndarray, + sigma: float = 0, + measurement_noise_std: Optional[np.ndarray] = None + ): + """Initializes a Kuramoto SDE with independent noise. + + dtheta_i = (omega_i + (K/M) sum_j a_{ij} sin(theta_j - theta_i)) dt + + sigma dW_i + + where M is the number of nodes in the network. + + The model returns sin(theta) to avoid discontinuities in the phase. + Similarly, the intervention is operates on the phase, but sin(x) is + applied to every state after the simulation is finished. + + Args: + omega (np.ndarray): The natural frequency of each oscilator. + K (float): The coupling constant. + adjacency_matrix (np.ndarray): A matrix containing the connectivity. + sigma (float): Parameter controlling the standard deiation of + system noise. + measurement_noise_std (ndarray): None, or a vector with shape (n,) + where each entry corresponds to the standard deviation of the + measurement noise for that particular dimension of the dynamic + model. For example, if the dynamic model had two variables x1 + and x2 and measurement_noise_std = [1, 10], then independent + gaussian noise with standard deviation 1 and 10 will be added to + x1 and x2 respectively at each point in time. + """ + dim = adjacency_matrix.shape[0] + self.omega = omega + self.K = K + self.adjacency_matrix = adjacency_matrix + self.Sigma = sigma * np.diag(np.ones(dim)) + super().__init__(dim, measurement_noise_std) + + + @copy_doc(StochasticDifferentialEquation.simulate) + def simulate( + self, + initial_condition: np.ndarray, + time_points: np.ndarray, + intervention: Optional[Callable[[np.ndarray, float], np.ndarray]]= None, + rng: np.random.mtrand.RandomState = DEFAULT_RANGE, + dW: Optional[np.ndarray] = None, + ) -> np.ndarray: + # Check initial condition. + if np.any(np.abs(initial_condition) > 1): + raise ValueError("Kuramoto Models require initial conditions in " + "the interval (-1, 1).") + # Extract phase of the initial condition. + theta0 = np.arcsin(initial_condition) + + # Wrap the intervention in arcsin. Its range must be [-1, 1]. + if intervention is not None: + intervention=kuramoto_intervention_wrapper(intervention) + + # Turn off measurment noise in order to add it after the transformation. + measurement_noise_std = self.measurement_noise_std + self.measurement_noise_std = None + + X_do = super().simulate( + theta0, + time_points, + intervention=intervention, + rng=rng, + dW=dW + ) + # Return sin of the phase. (This undoes the arcsin transformations.) + X_do = np.sin(X_do) + + self.measurement_noise_std = measurement_noise_std + if measurement_noise_std is not None: + # Don't add noise to initial condition. + X_do[1:, :] = self.add_measurement_noise(X_do[1:, :], rng) + + return X_do + + + def drift(self, theta: np.ndarray, t: float): + one = np.ones(self.dim) + prefactor = self.K / self.dim + theta_j = np.outer(one, theta) + theta_i = np.outer(theta, one) + + return self.omega + prefactor * ( + self.adjacency_matrix * np.sin(theta_j - theta_i)).dot(one) + + def noise(self, theta: np.ndarray, t): + return self.Sigma + + +class KuramotoSakaguchi(Kuramoto): + + def __init__( + self, + omega: np.ndarray, + K: float, + adjacency_matrix: np.ndarray, + phase_frustration: np.ndarray, + sigma: float = 0, + measurement_noise_std: Optional[np.ndarray] = None + ): + """Initializes a Kuramoto-Sakaguchi SDE with independent noise. + + dtheta_i = + (omega_i + (K/M) sum_j a_{ij} sin(theta_j - theta_i - A_{ij)}) dt + + sigma dW_i + + where M is the number of nodes in the network and Z is the phase + frustration matrix. + + Args: + omega (np.ndarray): The natural frequency of each oscilator. + K (float): The coupling constant. + adjacency_matrix (np.ndarray): A matrix containing the connectivity. + phase_frustration (np.ndarray): + sigma (float): Parameter controlling the standard deiation of + system noise. + measurement_noise_std (ndarray): None, or a vector with shape (n,) + where each entry corresponds to the standard deviation of the + measurement noise for that particular dimension of the dynamic + model. For example, if the dynamic model had two variables x1 + and x2 and measurement_noise_std = [1, 10], then independent + gaussian noise with standard deviation 1 and 10 will be added to + x1 and x2 respectively at each point in time. + """ + self.phase_frustration = phase_frustration + super().__init__( + omega, K, adjacency_matrix, sigma, measurement_noise_std) + + def drift(self, theta: np.ndarray, t: float): + one = np.ones(self.dim) + prefactor = self.K / self.dim + theta_j = np.outer(one, theta) + theta_i = np.outer(theta, one) + + return self.omega + prefactor * ( + self.adjacency_matrix * np.sin( + theta_j - theta_i - self.phase_frustration)).dot(one) diff --git a/interfere/dynamics/statespace_models.py b/interfere/dynamics/statespace_models.py index e344726..a57d5a1 100644 --- a/interfere/dynamics/statespace_models.py +++ b/interfere/dynamics/statespace_models.py @@ -142,10 +142,15 @@ def simulate( # Optional intervention if intervention is not None: - x_next = intervention(x_next) + x_next = intervention(x_next, time_points[i]) X[p+i, :] = x_next + if self.measurement_noise_std is not None: + X = self.add_measurement_noise(X, rng) + # Preserve initial conditions. + X[:p, :] = initial_condition + return X \ No newline at end of file diff --git a/tests/test_dynamics.py b/tests/test_dynamics.py index f6f8dd1..8c5b784 100644 --- a/tests/test_dynamics.py +++ b/tests/test_dynamics.py @@ -1,5 +1,7 @@ import interfere +from interfere.dynamics.decoupled_noise_dynamics import UncorrelatedNoise import numpy as np +import pytest from scipy import integrate import sdeint from statsmodels.tsa.vector_ar.util import varsim @@ -33,6 +35,84 @@ coupled_map_1dlattice_traveling_wave ] +def check_simulate_method( + model: interfere.base.DynamicModel, + x0: np.ndarray = None, + ): + """Checks that model.simulate() runs and interventions occur appropriately. + + Args: + model (interfere.base.DynamicModel): An initialized model. + x0: Optional initial condition for simulate. + """ + n = model.dim + m = 1000 + rng = np.random.default_rng(10) + if x0 is None: + x0 = np.random.rand(n) + t = np.linspace(0, 10, m) + # Save original measurement noise + orig_measurement_noise = model.measurement_noise_std + + # Adjust time scale for discrete time models + if isinstance(model, interfere.dynamics.base.DiscreteTimeDynamics): + t = np.arange(100) + m = 100 + + # For non noise models, add measurement noise: + if not isinstance(model,UncorrelatedNoise): + model.measurement_noise_std = 0.2 * np.ones(n) + + # Make intervention + interv_idx = 0 + interv_const = 0.5 + g = interfere.perfect_intervention(interv_idx, interv_const) + + # Check output shape + rng = np.random.default_rng(10) + X = model.simulate(x0, t, rng=rng) + assert X.shape == (m, n), ( + f"Output is the wrong shape"" for {model}.") + + # Check initial condition + if x0.ndim == 1: + assert np.allclose(X[0], x0), ( + f"Initial condition is incorrect for {model}.") + + elif x0.ndim == 2: + p, _ = x0.shape + assert np.allclose(X[:p, :], x0), ( + f"Initial condition is incorrect for {model}.") + + + # Check that random state works correctly + rng = np.random.default_rng(10) + X_rerun = model.simulate(x0, t, rng=rng) + assert np.all(X == X_rerun), ( + f"Random state does not preserve noise for {model}.") + + # Apply an intervention + rng = np.random.default_rng(10) + X_do = model.simulate(x0, t, intervention=g, rng=rng) + assert X_do.shape == (m, n), ( + f"Incorrect output size after intervention for {model}.") + + assert np.abs(np.mean(X_do[:, interv_idx]) - interv_const) < 0.1, ( + f"Intervention is incorrect for {model}.") + + # Make sure that random state works for interventions + rng = np.random.default_rng(10) + X_do_rerun = model.simulate(x0, t, intervention=g, rng=rng) + assert np.allclose(X_do, X_do_rerun), (f"Random state does not preserve " + "values after intervention for " + " {model}.") + + + # Reset to original measurement noise + model.measurement_noise_std = orig_measurement_noise + + return X, X_do + def test_lotka_voltera(): # Initialize interfere.LotkaVoltera model. n = 10 @@ -44,6 +124,9 @@ def test_lotka_voltera(): interv_const = 1.0 model = interfere.dynamics.LotkaVoltera(r, k, A) + # Standard checks for intervene.base.DynamicModel objects + check_simulate_method(model) + # Make two kinds of interventions perf_interv = interfere.perfect_intervention(interv_idx, interv_const) sin_interv = interfere.signal_intervention(interv_idx, np.sin) @@ -78,7 +161,7 @@ def sin_int_true_deriv(x, t): assert np.allclose(true_sin_X, interfere_sin_X) -def test_ornstein_uhlenbeck(): +def test_ornstein_uhlenbeck_and_sde_integrator(): seed = 11 rng = np.random.default_rng(seed) n = 3 @@ -87,6 +170,8 @@ def test_ornstein_uhlenbeck(): sigma = rng.random((n, n))- 0.5 model = interfere.dynamics.OrnsteinUhlenbeck(theta, mu, sigma) + # Standard checks for intervene.base.DynamicModel objects + check_simulate_method(model) x0 = np.random.rand(n) tspan = np.linspace(0, 10, 1000) @@ -156,20 +241,10 @@ def test_ornstein_uhlenbeck(): def test_coupled_logistic_map(): rng = np.random.default_rng(10) - A = rng.random((10, 10)) < 0.5 model = interfere.dynamics.coupled_logistic_map(A) - x0 = rng.random(10) - t = range(100) - X = model.simulate(x0, t) - assert X.shape == (100, 10) - - # Make an intervention function - interv_idx = 0 - interv_const = 1.0 - intervention = interfere.perfect_intervention(interv_idx, interv_const) - X_do = model.simulate(x0, t, intervention) - assert np.all(X_do[:, interv_idx] == interv_const) + # Standard checks for intervene.base.DynamicModel objects + check_simulate_method(model) def test_coupled_map_lattice(): @@ -180,17 +255,8 @@ def test_coupled_map_lattice(): for cml in COUPLED_MAP_LATTICES: model = cml(ndims) x0 = rng.random(10) - t = range(100) - X = model.simulate(x0, t) - assert X.shape == (100, 10) - - # Make an intervention function - interv_idx = 0 - interv_const = 1.0 - intervention = interfere.perfect_intervention(interv_idx, interv_const) - X_do = model.simulate(x0, t, intervention) - assert np.all(X_do[:, interv_idx] == interv_const) - + # Standard checks for intervene.base.DynamicModel objects + check_simulate_method(model) def test_stochastic_coupled_map_lattice(): @@ -199,38 +265,20 @@ def test_stochastic_coupled_map_lattice(): for cml in COUPLED_MAP_LATTICES: model = cml(ndims, sigma=0.1) - x0 = rng.random(10) - t = range(100) - X = model.simulate(x0, t) - assert X.shape == (100, 10) - - # Make an intervention function - interv_idx = 0 - interv_const = 1.0 - intervention = interfere.perfect_intervention(interv_idx, interv_const) - X_do = model.simulate(x0, t, intervention) - assert np.all(X_do[:, interv_idx] == interv_const) + # Standard checks for intervene.base.DynamicModel objects + check_simulate_method(model) def test_normal_noise(): rng = np.random.default_rng(11) model = interfere.dynamics.StandardNormalNoise(5) - X = model.simulate(np.ones(5), np.arange(1000), rng=rng) - - # Check initial condition - assert np.all(X[0] == 1) + # Standard checks for intervene.base.DynamicModel objects + X, X_do = check_simulate_method(model) # Check that the normal distribution works as expected assert np.all(np.abs(np.mean(X[1:, :], axis=0)) < 0.1) assert np.all(np.abs(np.std(X[1:, :], axis=0) - 1) < 0.1) - f = interfere.perfect_intervention(0, 30.0) - rng = np.random.default_rng(11) - X_do = model.simulate(np.ones(5), np.arange(1000), f, rng=rng) - - # Check that the intervention was applied - assert np.all(X_do[:, 0] == 30.0) - # Check that the random state generated reproducible noise assert np.all(X_do[:, 1:] == X[:, 1:]) @@ -243,20 +291,8 @@ def test_noise_dynamics(): for model_class in noise_models: dim = 5 model = model_class(dim) - - rng = np.random.default_rng(11) - X_do = model.simulate(np.ones(dim), np.arange(1000), f, rng=rng) - - # Check that the intervention was applied - assert np.all(X_do[:, 0] == 30.0) - - # Check that the array is the right size - assert X_do.shape == (1000, dim) - - # Check that random state is working properly - rng = np.random.default_rng(11) - X_do2 = model.simulate(np.ones(dim), np.arange(1000), f, rng=rng) - assert np.all(X_do == X_do2) + # Standard checks for intervene.base.DynamicModel objects + check_simulate_method(model) def test_arithmetic_brownian_motion(): @@ -265,25 +301,12 @@ def test_arithmetic_brownian_motion(): mu = np.ones(n) * -1 sigma = np.ones(n) * 0.1 model = interfere.dynamics.ArithmeticBrownianMotion(mu=mu, sigma=sigma) + # Standard checks for intervene.base.DynamicModel objects + X, X_do =check_simulate_method(model) - rng = np.random.default_rng(27) - time_points = np.linspace(0, 10, m) - x0 = np.ones(n) - dW = np.random.randn(m, n) - X = model.simulate(x0, time_points, rng=rng, dW=dW) - - assert X.shape == (m, n) - - # Test that expectation matches the analytic expectation - diff = np.mean(X, axis=1) - (mu[0] * time_points + x0[0]) - assert np.all(np.abs(diff) < 0.25) - - f = interfere.perfect_intervention(0, 10) - Xdo = model.simulate(x0, time_points, f, rng=rng, dW=dW) - - assert np.any(Xdo != X) - assert np.all(Xdo[:, 1:] == X[:, 1:]) - assert np.all(Xdo[:, 0] == 10) + # Make sure that the intervention makes a difference but only on one signal. + assert np.any(X_do != X) + assert np.all(X_do[:, 1:] == X[:, 1:]) def test_geometric_brownian_motion(): @@ -292,6 +315,11 @@ def test_geometric_brownian_motion(): mu = np.ones(n) * -1 sigma = np.ones(n) * 0.1 model = interfere.dynamics.GeometricBrownianMotion(mu=mu, sigma=sigma) + + # Standard checks for intervene.base.DynamicModel objects + check_simulate_method(model) + + # Run additional checks rng = np.random.default_rng(27) time_points = np.linspace(0, 10, m) @@ -312,7 +340,7 @@ def test_geometric_brownian_motion(): assert np.all(Xdo[:, 1:] == X[:, 1:]) assert np.all(Xdo[:, 0] == 10) - +@pytest.mark.slow def test_varma(): seed = 1 rs = np.random.RandomState(seed) @@ -354,3 +382,31 @@ def test_varma(): assert np.all( np.abs(np.mean(true_var_sim - varma_sim, axis=0)) < 0.2 ) + +def test_varima_standard_checks(): + rng = np.random.default_rng(10) + coef_matrices = [rng.random((3, 3)) - 0.5 for i in range(5)] + Z = rng.random((3, 3)) + Sigma = Z * Z.T + + model = interfere.dynamics.VARMA_Dynamics( + phi_matrices=coef_matrices[:2], + theta_matrices=coef_matrices[2:], + sigma=Sigma + ) + check_simulate_method(model, x0=rng.random((3, 3))) + + +def test_kuramoto(): + rng = np.random.default_rng(10) + omega = rng.random(10) + K = 0.7 + A = rng.random((10, 10)) < .3 + sigma=0.1 + error_std = np.ones(10) + + # Run standard checks for interfere.base.DynamicModel objects. + model = interfere.dynamics.Kuramoto(omega, K, A, sigma) + check_simulate_method(model) + model = interfere.dynamics.KuramotoSakaguchi(omega, K, A, A, sigma) + check_simulate_method(model) \ No newline at end of file