diff --git a/interfere/dynamics/__init__.py b/interfere/dynamics/__init__.py index 3958bbb..b3ee94c 100644 --- a/interfere/dynamics/__init__.py +++ b/interfere/dynamics/__init__.py @@ -2,7 +2,7 @@ from .base import OrdinaryDifferentialEquation, StochasticDifferentialEquation from .coupled_map_lattice import ( coupled_logistic_map, - CoupledMapLattice, + StochasticCoupledMapLattice, coupled_map_1dlattice_chaotic_brownian, coupled_map_1dlattice_chaotic_traveling_wave, coupled_map_1dlattice_defect_turbulence, diff --git a/interfere/dynamics/base.py b/interfere/dynamics/base.py index 64536b6..08dd918 100644 --- a/interfere/dynamics/base.py +++ b/interfere/dynamics/base.py @@ -92,7 +92,6 @@ def simulate( initial_condition: np.ndarray, time_points: np.ndarray, intervention: Optional[Callable[[np.ndarray, float], np.ndarray]]= None, - # TODO: Change measurement noise so it is a data member. rng: np.random.mtrand.RandomState = DEFAULT_RANGE, dW: Optional[np.ndarray] = None, ) -> np.ndarray: @@ -282,7 +281,7 @@ def simulate( X_do[i] = intervention(X_do[i], time_points[i]) # Compute next state - X_do[i+1] = self.step(X_do[i]) + X_do[i+1] = self.step(X_do[i], rng) # After the loop, apply interention to the last step if intervention is not None: @@ -291,11 +290,13 @@ def simulate( return X_do @abstractmethod - def step(self, x: np.ndarray): + def step(self, x: np.ndarray, rng: np.random.mtrand.RandomState): """Uses the current state to compute the next state of the system. Args: x (np.ndarray): The current state of the system. + rng (RandomState): A numpy random state for generating random + numbers. Returns: x_next (np.ndarray): The next state of the system. diff --git a/interfere/dynamics/coupled_map_lattice.py b/interfere/dynamics/coupled_map_lattice.py index 454e08e..090be1b 100644 --- a/interfere/dynamics/coupled_map_lattice.py +++ b/interfere/dynamics/coupled_map_lattice.py @@ -1,3 +1,16 @@ +"""Contains classes for simulating and intervening on stochastic coupled map lattices. + +Notes: + 1. For different maps, the domain of the CML is different so initial + conditions must be sampled correctly. + a. Logistic map: x_i in (0.0, 1.0) + b. Quadradic map: x_i in (-1.0, 1.0) + 2. If stochastic noise is introduced, it must respect the domain of the CML + so that the orbits don't diverge. + 3. More discrete time stochastic maps can be found in Gade et. al. 1997 + "Stochastic Resonance in..." +""" + from typing import Optional, Callable import numpy as np @@ -6,12 +19,40 @@ from ..base import DEFAULT_RANGE -def logistic_map(x, alpha=3.72): +def logistic_map(x: np.ndarray, alpha: float = 3.72): return alpha * x * (1 - x) -def quadradic_map(x, alpha=1.45): + +def quadradic_map(x: np.ndarray, alpha:float = 1.45): return 1 - alpha * x ** 2 + +def mod_interval(x: np.ndarray, a: float, b: float): + """Applies the mod operation to ensure each element of x is in (a, b). + + Uses + + a + (b - a)(x mod 1) + + on entries of x that are not in the interval (a, b). + + Args: + x (ndarray): A numpy array. + a (float): The minimum value for entries in x. + b (float): The maximum value for entries in x + + Returns: + x_mod (ndarray): The array x with all entries that were + outside (a, b) modded so they are within (a, b). + """ + if b <= a: + raise ValueError("Argument `a` must be less than `b` argument.") + + outside = (x < a) | (x > b) + x_mod = x.copy() + x_mod[outside] = a + (b - a)(x_mod[outside] % 1) + + class CoupledMapLattice(DiscreteTimeDynamics): def __init__( @@ -28,7 +69,7 @@ def __init__( A coupled map lattice where coupling is determined by the passed adjacency matrix. - x[n+1]_i = (1 - eps) * f(x[n]_i) + (eps / degree(i)) sum_j Aij f(x[n]_j) + x[n+1]_i = (1 - eps) * f(x[n]_i) + (eps / degree(i)) sum_j Aji f(x[n]_j) Note: This is not a continuous time system. @@ -63,17 +104,15 @@ def __init__( self.row_sums = np.sum(self.adjacency_matrix, axis=1) super().__init__(self.adjacency_matrix.shape[0], measurement_noise_std) - - def step(self, x: np.ndarray): + def step(self, x: np.ndarray, rng: np.random.mtrand.RandomState): """One step forward in time for a coupled map lattice - A coupled map lattice where coupling is determined by the passed - adjacency matrix and the map applied is the logistic map. + A coupled map lattice where coupling is determined by + `self.adjacency_matrix` and the map is determined by `self.f`. x[n+1]_i = (1 - eps) * f(x[n]_i) + (eps / degree(i)) sum_j Aij f(x[n]_j) - where f(x) = logistic_param * x * (1 - x). """ x_next = (1 - self.eps) * self.f(x, *self.f_params) x_next += self.eps * self.adjacency_matrix @ self.f(x, *self.f_params) @@ -134,6 +173,109 @@ def simulate( return X +class StochasticCoupledMapLattice(CoupledMapLattice): + + def __init__( + self, + adjacency_matrix: np.array, + eps: float = 0.5, + f: Callable = quadradic_map, + f_params: tuple =(1.45,), + sigma: float = 0.0, + x_min: float = 0.0, + x_max: float = 1.0, + boundry_condition: Optional[str] = "none", + tsteps_btw_obs: int = 1, + measurement_noise_std: Optional[np.ndarray] = None + ): + """N-dimensional coupled logistic map. + + A coupled map lattice where coupling is determined by the passed + adjacency matrix. + + x[n+1]_i = (1 - eps) * f(x[n]_i) + (eps / degree(i)) sum_j Aij + f(x[n]_j) + omega[n]_i + + where omega[n]_i ~ Normal(0, sigma) and x[n+1]_i is constrained to + stay within the domain. + + When x[n+1]_i is not in (x_min, x_max) we replace x[n+1]_i with + + x_min + (x_max - x_min) * (x[n+1]_i mod 1) + + to ensure that the system remains in it's domain. + + Args: + adjacency_matrix (2D array): The adjacency matrix that defines the + way the variables are coupled. The entry A[i, j] should contain + the weight of the link from x_j to x_i. + eps (float): A parameter that controls the relative strenth of + coupling. High epsilon means greater connection to neighbors. + f (Callable): A function to be used as f(x) in the equation above. + Should accept a numpy array followed by an arbitrary number of + scalar parameters. + f_params (tuple): A tuple of floats that will be unpacked and + passed to f as so: `f(x, **f_params)`. + sigma (float): The standard deviation of the additive gaussian noise + in the model. + x_min (float): Optional minimum bound (applied to state + elementwise) to ensure that the noise does not peturb the system out of it's domain. + x_max (float): Optional maximum bound (applied to state + elementwise) to ensure that the noise does not peturb the system + out of it's domain. + boundry_condition (string): One of ["none", "truncate", "mod"]. If + "truncate", is selected, entries in `x` are reset to be the + closer of `x_min` or `x_max` when they leave the domain. If + "mod" is selected, and and entry in `x`, `x_i` is not in + `(x_min, x_max)` then `x_i` is set to: + `x_i = x_min + (x_max - x_min) * (x_i mod 1)` + tsteps_per_obs (int): The number of timesteps between observations. + The downsample rate. (Interventions will still be applied between observations.) + measurement_noise_std (ndarray): None, or a vector with shape (n,) + where each entry corresponds to the standard deviation of the + measurement noise for that particular dimension of the dynamic + model. For example, if the dynamic model had two variables x1 + and x2 and `measurement_noise_std = [1, 10]`, then + independent gaussian noise with standard deviation 1 and 10 + will be added to x1 and x2 respectively at each point in time. + """ + self.sigma = sigma + self.x_max = x_max + self.x_min = x_min + self.boundry_condition = boundry_condition + super().__init__( + adjacency_matrix, eps, f, f_params, tsteps_btw_obs, measurement_noise_std) + + + + def step(self, x: np.ndarray, rng: np.random.mtrand.RandomState): + """One step forward in time for a stochastic coupled map lattice. + + A stochastic coupled map lattice where coupling is determined by + `self.adjacency_matrix` and the map is determined by `self.f`. + + x[n+1]_i = (1 - eps) * f(x[n]_i) + (eps / degree(i)) sum_j Aij f(x[n]_j)+ w[n]_i + + where w[n] ~ N(0, sigma) and x_i is constrained to be in the interval + (self.x_min, self.x_max). + """ + x_next = super().step(x, rng) + + # This check enables sigma == 0.0 to generate deterministic dynamics. + if self.sigma != 0.0: + x_next += rng.normal(0, self.sigma, size=self.dim) + + # See if the state is within boundary and apply appropriate transform. + if self.boundry_condition == "mod": + x_next = mod_interval(x_next, self.x_min, self.x_max) + + elif self.boundry_condition == "truncate": + x_next[x_next < self.x_min] = self.x_min + x_next[x_next > self.x_max] = self.x_max + + return x_next + + def coupled_logistic_map( adjacency_matrix: np.array, logistic_param=3.72, @@ -170,10 +312,11 @@ def coupled_logistic_map( -def coupled_map_1dlattice( - ndims: int, +def stochastic_coupled_map_1dlattice( + dim: int, alpha: float, eps: float, + sigma: float = 0.0, tsteps_btw_obs: int = 1, measurement_noise_std: Optional[np.ndarray] = None ): @@ -181,14 +324,18 @@ def coupled_map_1dlattice( The dynamics are governed by the following equation: - x[n+1]_i = (1 - eps) * f(x[n]_i) + (eps / 2) * (f(x[n]_i+1) + f(x[n]_i-1)) + x[n+1]_i = (1 - eps) * f(x[n]_i) + (eps / 2) * (f(x[n]_i+1) + f(x[n]_i-1)) + w[n]_i - where f(x) = 1 - alpha * x^2. + where f(x) = 1 - alpha * x^2, w[n]_i ~ N(0, sigma) and x[n+1]_i is + constrained to stay in the interval [-1, 1] via truncation. Setting + sigma = 0 will result in a deterministic system. Args: ndmis: The number of nodes in the 1D lattice. alpha: The quadratic map parameter (1 - alpha * x^2) eps: The lattice coupling parameter. + sigma (float): The standard deviation of the additive gaussian noise + in the model. tsteps_per_obs (int): The number of timesteps between observations. The downsample rate. (Interventions will still be applied between observations.) measurement_noise_std (ndarray): None, or a vector with shape (n,) @@ -200,14 +347,23 @@ def coupled_map_1dlattice( will be added to x1 and x2 respectively at each point in time. """ # Construct the 1D lattice matrix - ones = np.ones(ndims - 1) + ones = np.ones(dim - 1) adjacency_matrix = np.diag(ones, k=1) + np.diag(ones, k=-1) - return CoupledMapLattice( - adjacency_matrix, eps, quadradic_map, (alpha,), tsteps_btw_obs, measurement_noise_std) + return StochasticCoupledMapLattice( + adjacency_matrix, + eps, + quadradic_map, + (alpha,), + sigma=sigma, + x_min=-1, + x_max=1, + boundry_condition="truncate", + tsteps_btw_obs=tsteps_btw_obs, measurement_noise_std=measurement_noise_std + ) def coupled_map_1dlattice_frozen_chaos( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -226,6 +382,8 @@ def coupled_map_1dlattice_frozen_chaos( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -234,12 +392,12 @@ def coupled_map_1dlattice_frozen_chaos( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.45, 0.2, measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.45, 0.2, measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_pattern_selection( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -258,6 +416,8 @@ def coupled_map_1dlattice_pattern_selection( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -266,11 +426,11 @@ def coupled_map_1dlattice_pattern_selection( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.71, 0.4, measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.71, 0.4, measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_chaotic_brownian( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -289,6 +449,8 @@ def coupled_map_1dlattice_chaotic_brownian( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -297,12 +459,13 @@ def coupled_map_1dlattice_chaotic_brownian( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.85, 0.1, measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.85, 0.1, sigma=sigma, + measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_defect_turbulence( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -321,6 +484,8 @@ def coupled_map_1dlattice_defect_turbulence( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -329,12 +494,13 @@ def coupled_map_1dlattice_defect_turbulence( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.895, 0.1, measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.895, 0.1, sigma=sigma, + measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_spatiotemp_intermit1( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -354,6 +520,8 @@ def coupled_map_1dlattice_spatiotemp_intermit1( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -362,12 +530,13 @@ def coupled_map_1dlattice_spatiotemp_intermit1( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.71, 0.4, tsteps_btw_obs=12,measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.71, 0.4, tsteps_btw_obs=12, sigma=sigma, + measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_spatiotemp_intermit2( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -386,6 +555,8 @@ def coupled_map_1dlattice_spatiotemp_intermit2( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -394,12 +565,13 @@ def coupled_map_1dlattice_spatiotemp_intermit2( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.75, 0.3, measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.75, 0.3, sigma=sigma, + measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_spatiotemp_chaos( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -418,6 +590,8 @@ def coupled_map_1dlattice_spatiotemp_chaos( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -426,12 +600,13 @@ def coupled_map_1dlattice_spatiotemp_chaos( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 2.0, 0.3, measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 2.0, 0.3, sigma=sigma, + measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_traveling_wave( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -451,6 +626,8 @@ def coupled_map_1dlattice_traveling_wave( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -459,12 +636,13 @@ def coupled_map_1dlattice_traveling_wave( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.67, 0.5, tsteps_btw_obs=2000,measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.67, 0.5, tsteps_btw_obs=2000, sigma=sigma, + measurement_noise_std=measurement_noise_std) def coupled_map_1dlattice_chaotic_traveling_wave( - ndims: int=10, + dim: int=10, sigma: float=0, measurement_noise_std: Optional[np.ndarray] = None ): """Intitializes a specific parameterization of a CoupledMapLattice. @@ -484,6 +662,8 @@ def coupled_map_1dlattice_chaotic_traveling_wave( Args: ndmis: The number of nodes in the model. + sigma (float): The standard deviation of the additive gaussian noise + in the model. measurement_noise_std (ndarray): None, or a vector with shape (n,) where each entry corresponds to the standard deviation of the measurement noise for that particular dimension of the dynamic @@ -492,5 +672,6 @@ def coupled_map_1dlattice_chaotic_traveling_wave( independent gaussian noise with standard deviation 1 and 10 will be added to x1 and x2 respectively at each point in time. """ - return coupled_map_1dlattice( - ndims, 1.69, 0.5, tsteps_btw_obs=5000,measurement_noise_std=measurement_noise_std) + return stochastic_coupled_map_1dlattice( + dim, 1.69, 0.5, tsteps_btw_obs=5000, sigma=sigma, + measurement_noise_std=measurement_noise_std) \ No newline at end of file diff --git a/tests/test_dynamics.py b/tests/test_dynamics.py index c43623c..f6f8dd1 100644 --- a/tests/test_dynamics.py +++ b/tests/test_dynamics.py @@ -21,6 +21,18 @@ coupled_map_1dlattice_traveling_wave ) +COUPLED_MAP_LATTICES = [ + coupled_map_1dlattice_chaotic_brownian, + coupled_map_1dlattice_chaotic_traveling_wave, + coupled_map_1dlattice_defect_turbulence, + coupled_map_1dlattice_frozen_chaos, + coupled_map_1dlattice_pattern_selection, + coupled_map_1dlattice_spatiotemp_chaos, + coupled_map_1dlattice_spatiotemp_intermit1, + coupled_map_1dlattice_spatiotemp_intermit2, + coupled_map_1dlattice_traveling_wave +] + def test_lotka_voltera(): # Initialize interfere.LotkaVoltera model. n = 10 @@ -161,21 +173,11 @@ def test_coupled_logistic_map(): def test_coupled_map_lattice(): - coupled_map_lattices = [ - coupled_map_1dlattice_chaotic_brownian, - coupled_map_1dlattice_chaotic_traveling_wave, - coupled_map_1dlattice_defect_turbulence, - coupled_map_1dlattice_frozen_chaos, - coupled_map_1dlattice_pattern_selection, - coupled_map_1dlattice_spatiotemp_chaos, - coupled_map_1dlattice_spatiotemp_intermit1, - coupled_map_1dlattice_spatiotemp_intermit2, - coupled_map_1dlattice_traveling_wave - ] + rng = np.random.default_rng(10) ndims = 10 - for cml in coupled_map_lattices: + for cml in COUPLED_MAP_LATTICES: model = cml(ndims) x0 = rng.random(10) t = range(100) @@ -190,6 +192,26 @@ def test_coupled_map_lattice(): assert np.all(X_do[:, interv_idx] == interv_const) +def test_stochastic_coupled_map_lattice(): + + rng = np.random.default_rng(10) + ndims = 10 + + for cml in COUPLED_MAP_LATTICES: + model = cml(ndims, sigma=0.1) + x0 = rng.random(10) + t = range(100) + X = model.simulate(x0, t) + assert X.shape == (100, 10) + + # Make an intervention function + interv_idx = 0 + interv_const = 1.0 + intervention = interfere.perfect_intervention(interv_idx, interv_const) + X_do = model.simulate(x0, t, intervention) + assert np.all(X_do[:, interv_idx] == interv_const) + + def test_normal_noise(): rng = np.random.default_rng(11) model = interfere.dynamics.StandardNormalNoise(5)