Skip to content

Commit

Permalink
tweaks to method tests, added VARMA dynamics
Browse files Browse the repository at this point in the history
  • Loading branch information
djpasseyjr committed Jan 23, 2024
1 parent 80bf600 commit 8827e0e
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# VS Code
*.code-workspace

# iOS
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
17 changes: 8 additions & 9 deletions interfere/dynamics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
from .arithmetic_brownian_motion import ArithmeticBrownianMotion
from .base import OrdinaryDifferentialEquation, StochasticDifferentialEquation
from .coupled_logistic_maps import CoupledLogisticMaps
from .decoupled_noise_dynamics import (
StandardNormalNoise,
StandardCauchyNoise,
StandardExponentialNoise,
StandardGammaNoise,
StandardTNoise
)
from .geometric_brownian_motion import GeometricBrownianMotion
from .lotka_voltera import LotkaVoltera, LotkaVolteraSDE
from .ornstein_uhlenbeck import OrnsteinUhlenbeck
from .quadratic_sdes import Belozyorov3DQuad, Liping3DQuadFinance

from .simple_linear_sdes import (
DampedOscillator,
LinearSDE,
imag_roots_2d_linear_sde,
imag_roots_4d_linear_sde,
attracting_fixed_point_4d_linear_sde
)

from .decoupled_noise_dynamics import (
StandardNormalNoise,
StandardCauchyNoise,
StandardExponentialNoise,
StandardGammaNoise,
StandardTNoise
)
from .statespace_models import VARMA_Dynamics
88 changes: 86 additions & 2 deletions interfere/dynamics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,94 @@ def simulate(
return X_do

@abstractmethod
def step(self, x_n: np.ndarray):
def step(self, x: np.ndarray):
"""Uses the current state to compute the next state of the system.
Args:
x_n (np.ndarray): The current state of the system.
x (np.ndarray): The current state of the system.
Returns:
x_next (np.ndarray): The next state of the system.
"""
raise NotImplementedError()


class DiscreteTimeDelayDynamics(DynamicModel):

def simulate(
self,
initial_condition: np.ndarray,
time_points: np.ndarray,
intervention: Optional[Callable[[np.ndarray, float], np.ndarray]]= None,
rng: np.random.mtrand.RandomState = DEFAULT_RANGE,
) -> np.ndarray:
"""Runs a simulation of the discrete time dynamic model.
Args:
initial_condition (ndarray): A (m, p) array of the historical
condition of the dynamic model.
time_points (ndarray): A (n,) array of the time points where the
dynamic model will be simulated. Must be integers
intervention (callable): A function that accepts (1) a vector of the
current state of the dynamic model and (2) the current time. It should return a modified state. The function will be used in the
following way:
If the dynamic model without the intervention can be described
as
x(t+dt) = F(x(t))
where dt is the timestep size, x(t) is the trajectory, and F is
the function that uses the current state to compute the state at
the next timestep. Then the intervention function will be used
to simulate the system
z(t+dt) = F(g(z(t), t), t)
x_do(t) = g(z(t), t)
where x_do is the trajectory of the intervened system and g is
the intervention function.
rng: A numpy random state for reproducibility. (Uses numpy's mtrand
random number generator by default.)
Returns:
X: An (n, m) array containing a realization of the trajectory of
the m dimensional system corresponding to the n times in
`time_points`. The first p rows contain the initial condition/
history of the system and count towards n.
"""
nsteps = len(time_points)

# Make sure that the simulation is not passed continuous time values
if np.any(np.round(time_points) != time_points):
raise ValueError("DiscreteTimeDynamics require integer time points")

# Initialize array of realizations of the trajectory.
X_do = np.zeros((nsteps, self.dim))
X_do[0] = initial_condition

for i in range(nsteps - 1):

# Apply intervention to current value
if intervention is not None:
X_do[i] = intervention(X_do[i], time_points[i])

# Compute next state
X_do[i+1] = self.step(X_do[i])

# After the loop, apply interention to the last step
if intervention is not None:
X_do[-1] = intervention(X_do[-1], time_points[-1])

return X_do

@abstractmethod
def step(self, x: np.ndarray):
"""Uses the current state to compute the next state of the system.
Args:
x (np.ndarray): The current state of the system.
Returns:
x_next (np.ndarray): The next state of the system.
"""
raise NotImplementedError()
151 changes: 151 additions & 0 deletions interfere/dynamics/statespace_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""State space dynamic models such as vector autoregression or VARIMAX.
"""
from typing import Callable, List, Optional

import numpy as np

from ..base import DynamicModel, DEFAULT_RANGE


class VARMA_Dynamics(DynamicModel):

def __init__(
self,
phi_matrices: List[np.ndarray],
theta_matrices: List[np.ndarray],
sigma: np.ndarray,
measurement_noise_std: Optional[np.ndarray] = None
):
"""Initializes a vector autoregressive moving average model.
z_n = a_n + sum_{i=1}^p phi_i z_(n-i) + sum_{i=1}^q theta_i a_{n-i}
with each a_n ~ MultivariateNormal(0, Sigma).
See S2.2 of the supplimental material in Cliff et. al 2023, "Unifying pairwise..."
Args:
phi_matrices: A list of p (nxn) numpy arrays. The autoregressive
components of the model. The ith array corresponds to phi_i in the formula above.
theta_matrices: A list of q (nxn) numpy arrays. The moving average
component of the model. The ith array corresponds to theta_i in
the formula above.
sigma: A (nxn) symmetric positive definite numpy array. The
covariance of the noise.
measurement_noise_std (ndarray): None, or a vector with shape (n,)
where each entry corresponds to the standard deviation of the
measurement noise for that particular dimension of the dynamic
model. For example, if the dynamic model had two variables x1
and x2 and `measurement_noise_std = [1, 10]`, then
independent gaussian noise with standard deviation 1 and 10
will be added to x1 and x2 respectively at each point in time.
"""
phi_dims = [n for phi_i in phi_matrices for n in phi_i.shape]
theta_dims = [n for theta_i in theta_matrices for n in theta_i.shape]
sigma_dims = [*sigma.shape]
n, m = phi_matrices[0].shape
if not np.all(np.hstack([phi_dims, theta_dims, sigma_dims]) == n):
raise ValueError("All theta and phi matrices and the sigma matrix"
" should be square and have the same dimension.")

self.phi_matrices = phi_matrices
self.theta_matrices = theta_matrices
self.sigma = sigma
super().__init__(n, measurement_noise_std)

def simulate(
self,
initial_condition: np.ndarray,
time_points: np.ndarray,
intervention: Optional[Callable[[np.ndarray, float], np.ndarray]]=None,
rng: np.random.mtrand.RandomState = DEFAULT_RANGE
) -> np.ndarray:
"""Simulates the VARMA model with arbitrary interventions.
Args:
initial_condition (ndarray): A (p, m) array of the initial
condition of the dynamic model.
time_points (ndarray): A (n,) array of the time points where the
dynamic model will be simulated. Must be integers
intervention (callable): A function that accepts (1) a vector of the
current state of the dynamic model and (2) the current time. It should return a modified state. The function will be used in the
following way:
If the dynamic model without the intervention can be described
as
x(t+dt) = F(x(t))
where dt is the timestep size, x(t) is the trajectory, and F is
the function that uses the current state to compute the state at
the next timestep. Then the intervention function will be used
to simulate the system
z(t+dt) = F(g(z(t), t), t)
x_do(t) = g(z(t), t)
where x_do is the trajectory of the intervened system and g is
the intervention function.
rng: A numpy random state for reproducibility. (Uses numpy's mtrand
random number generator by default.)
Returns:
X: An (n, m) array containing a realization of the trajectory of
the m dimensional system corresponding to the n times in
`time_points`. The first p rows contain the initial condition/
history of the system and count towards n.
"""
p, m = initial_condition.shape
q = len(self.theta_matrices)
n = len(time_points)
X = np.zeros((n, m))

# Assign initial condition
X[:p, :] = initial_condition

# Initialize noise
noise_vecs = rng.multivariate_normal(
np.zeros(m), self.sigma, n - p + q)

# Simulate
for i in range(n - p - 1):
x_AR = np.sum([
phi @ x_i
for x_i, phi in zip(X[i:(p+i)], self.phi_matrices[::-1])
])
x_MA = np.sum([
theta @ a_i
for a_i, theta in zip(
noise_vecs[i:(q+i)], self.theta_matrices[::-1])
])
x_next = noise_vecs[q+i+1] + x_AR + x_MA

X[p+i+1, :] = x_next

# Simulate
for i in range(n - p):

# Compute autogregressive component
x_AR = np.sum([
phi @ x_i
for x_i, phi in zip(X[i:(p+i)], self.phi_matrices[::-1])
], axis=0)

# Compute moving average (autocorrelated noise) component
x_MA = np.sum([
theta @ a_i
for a_i, theta in zip(
noise_vecs[i:(q+i)], self.theta_matrices[::-1])
], axis=0)

# Combine components and with one stochastic vector
x_next = noise_vecs[q+i] + x_AR + x_MA

# Optional intervention
if intervention is not None:
x_next = intervention(x_next)

X[p+i, :] = x_next

return X


46 changes: 45 additions & 1 deletion tests/test_models.py → tests/test_dynamics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import numpy as np
from scipy import integrate
import sdeint
from statsmodels.tsa.vector_ar.util import varsim

from interfere.dynamics import (
StandardNormalNoise,
Expand Down Expand Up @@ -247,4 +248,47 @@ def test_geometric_brownian_motion():

assert np.any(Xdo != X)
assert np.all(Xdo[:, 1:] == X[:, 1:])
assert np.all(Xdo[:, 0] == 10)
assert np.all(Xdo[:, 0] == 10)


def test_varma():
seed = 1
rs = np.random.RandomState(seed)

# Initialize a random VAR model
A1 = rs.rand(3, 3) - 0.5
A2 = rs.rand(3, 3) - 0.5
coefs = np.stack([A1, A2])
mu = np.zeros(3)
Z = rs.rand(3, 3)
Sigma = Z * Z.T
steps = 101
initial_vals = np.ones((2, 3))
nsims = 10000

# Simulate it
true_var_sim = varsim(
coefs,
mu,
Sigma,
steps=steps,
initial_values=initial_vals,
seed=seed,
nsimulations=nsims,
)

# Initialize a VARMA model with no moving average component
model = interfere.dynamics.VARMA_Dynamics(
phi_matrices=coefs,
theta_matrices=[np.zeros((3,3))],
sigma=Sigma
)
varma_sim = np.stack([
model.simulate(initial_vals, time_points=np.arange(steps))
for i in range(nsims)
], axis=0)
# Average over the 10000 simulations to compute the expected trajectory.
# Make sure it is equal for both models.
assert np.all(
np.abs(np.mean(true_var_sim - varma_sim, axis=0)) < 0.2
)
4 changes: 2 additions & 2 deletions tests/test_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_simulate_perfect_intervention_var():
# Average over the 10000 simulations to compute the expected trajectory.
# Make sure it is equal for both models.
assert np.all(
np.mean(true_var_sim - perf_inter_sim[:, :, :3], axis=0) < 0.1
np.abs(np.mean(true_var_sim - perf_inter_sim[:, :, :3], axis=0)) < 0.1
)

# Do a third simulation to double check that the above average doesn't hold
Expand All @@ -96,7 +96,7 @@ def test_simulate_perfect_intervention_var():
)

assert not np.all(
np.mean(true_var_sim - true_var_sim2, axis=0) < 0.1
np.abs(np.mean(true_var_sim - true_var_sim2, axis=0)) < 0.1
)

def test_benchmark_methods():
Expand Down

0 comments on commit 8827e0e

Please sign in to comment.