Skip to content

Commit

Permalink
formatting and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
teubert committed Dec 15, 2023
1 parent 8125b16 commit 67cf424
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 55 deletions.
12 changes: 8 additions & 4 deletions src/progpy/composite_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ def output(self, x):
z = {}
for (name, m) in self.parameters['models']:
# Prepare state
x_i = m.StateContainer({key: x[name + '.' + key] for key in m.states})
x_i = m.StateContainer({
key: x[name + '.' + key] for key in m.states})

# Get outputs
z_i = m.output(x_i)
Expand All @@ -312,7 +313,8 @@ def performance_metrics(self, x) -> dict:
metrics = {}
for (name, m) in self.parameters['models']:
# Prepare state
x_i = m.StateContainer({key: x[name + '.' + key] for key in m.states})
x_i = m.StateContainer({
key: x[name + '.' + key] for key in m.states})

# Get outputs
metrics_i = m.performance_metrics(x_i)
Expand All @@ -326,7 +328,8 @@ def event_state(self, x) -> dict:
e = {}
for (name, m) in self.parameters['models']:
# Prepare state
x_i = m.StateContainer({key: x[name + '.' + key] for key in m.states})
x_i = m.StateContainer({
key: x[name + '.' + key] for key in m.states})

# Get outputs
e_i = m.event_state(x_i)
Expand All @@ -340,7 +343,8 @@ def threshold_met(self, x) -> dict:
tm = {}
for (name, m) in self.parameters['models']:
# Prepare state
x_i = m.StateContainer({key: x[name + '.' + key] for key in m.states})
x_i = m.StateContainer({
key: x[name + '.' + key] for key in m.states})

# Get outputs
tm_i = m.threshold_met(x_i)
Expand Down
53 changes: 29 additions & 24 deletions src/progpy/predictors/monte_carlo.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class MonteCarlo(Predictor):
'n_samples': None
}

def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **kwargs) -> PredictionResults:
def predict(self, state: UncertainData, future_loading_eqn: Callable=None, **kwargs) -> PredictionResults:
"""
Perform a single prediction
Expand Down Expand Up @@ -71,7 +71,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k
"""
if isinstance(state, dict) or isinstance(state, self.model.StateContainer):
from progpy.uncertain_data import ScalarData
state = ScalarData(state, _type = self.model.StateContainer)
state = ScalarData(state, _type=self.model.StateContainer)
elif isinstance(state, UncertainData):
state._type = self.model.StateContainer
else:
Expand All @@ -80,8 +80,8 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k
if future_loading_eqn is None:
future_loading_eqn = lambda t, x=None: self.model.InputContainer({})

params = deepcopy(self.parameters) # copy parameters
params.update(kwargs) # update for specific run
params = deepcopy(self.parameters) # copy parameters
params.update(kwargs) # update for specific run
params['print'] = False
params['progress'] = False

Expand Down Expand Up @@ -130,38 +130,40 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k
params['save_freq'] = (params['t0'], params['save_freq'])

if len(params['events']) == 0: # Predict to time
(times, inputs, states, outputs, event_states) = simulate_to_threshold(future_loading_eqn,
first_output,
threshold_keys = [],
(times, inputs, states, outputs, event_states) = simulate_to_threshold(
future_loading_eqn,
first_output,
threshold_keys=[],
**params
)
else:
events_remaining = params['events'].copy()

times = []
inputs = SimResult(_copy = False)
states = SimResult(_copy = False)
outputs = LazySimResult(fcn = self.model.output, _copy = False)
event_states = LazySimResult(fcn = es_eqn, _copy = False)
inputs = SimResult(_copy=False)
states = SimResult(_copy=False)
outputs = LazySimResult(fcn=self.model.output, _copy=False)
event_states = LazySimResult(fcn=es_eqn, _copy=False)

# Non-vectorized prediction
while len(events_remaining) > 0: # Still events to predict
# Since horizon is relative to t0 (the simulation starting point),
# we must subtract the difference in current t0 from the initial (i.e., prediction t0)
# each subsequent simulation
params['horizon'] = HORIZON - (params['t0'] - t0)
(t, u, xi, z, es) = simulate_to_threshold(future_loading_eqn,
first_output,
threshold_keys = events_remaining,
(t, u, xi, z, es) = simulate_to_threshold(
future_loading_eqn,
first_output,
threshold_keys=events_remaining,
**params
)

# Add results
times.extend(t)
inputs.extend(u)
states.extend(xi)
outputs.extend(z, _copy = False)
event_states.extend(es, _copy = False)
outputs.extend(z, _copy=False)
event_states.extend(es, _copy=False)

# Get which event occurs
t_met = tm_eqn(states[-1])
Expand All @@ -178,7 +180,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k

# An event has occured
time_of_event[event] = times[-1]
events_remaining.remove(event) # No longer an event to predect to
events_remaining.remove(event) # No longer an event to predict to

# Remove last state (event)
params['t0'] = times.pop()
Expand All @@ -189,7 +191,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k
event_states.pop()

# Add to "all" structures
if len(times) > len(times_all): # Keep longest
if len(times) > len(times_all): # Keep longest
times_all = times
inputs_all.append(inputs)
states_all.append(states)
Expand All @@ -206,14 +208,17 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k

# Transform final states:
time_of_event.final_state = {
key: UnweightedSamples([sample[key] for sample in last_states], _type = self.model.StateContainer) for key in time_of_event.keys()
key: UnweightedSamples(
[sample[key] for sample in last_states],
_type=self.model.StateContainer
) for key in time_of_event.keys()
}

return PredictionResults(
times_all,
inputs_all,
states_all,
outputs_all,
event_states_all,
times_all,
inputs_all,
states_all,
outputs_all,
event_states_all,
time_of_event
)
4 changes: 3 additions & 1 deletion src/progpy/predictors/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def __init__(self, model, **kwargs):
self.model = model

self.parameters = deepcopy(self.default_parameters)
self.parameters['events'] = self.model.events.copy() # Events to predict to
# Events to predict to - must be a list
# This is because of limitations with jsonify for sets
self.parameters['events'] = list(self.model.events.copy())
self.parameters.update(kwargs)

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion src/progpy/predictors/unscented_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def predict(self, state, future_loading_eqn: Callable = None, **kwargs) -> Predi
threshold_met = model.threshold_met
StateContainer = model.StateContainer

# Update State
# Update State
self.__state_keys = state_keys = state.mean.keys() # Used to maintain ordering as we strip keys and return
filt.x = [x for x in state.mean.values()]
filt.P = state.cov
Expand Down
62 changes: 38 additions & 24 deletions src/progpy/state_estimators/kalman_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
from warnings import warn

from progpy import LinearModel
from progpy.state_estimators import state_estimator
from progpy.uncertain_data import MultivariateNormalDist, UncertainData

from . import state_estimator
from ..uncertain_data import MultivariateNormalDist, UncertainData

class KalmanFilter(state_estimator.StateEstimator):
"""
A Kalman Filter (KF) for state estimation
This class defines the logic for performing a kalman filter with a LinearModel (see Prognostics Model Package). This filter uses measurement data with noise to generate a state estimate and covariance matrix.
This class defines the logic for performing a kalman filter with a linear model (i.e., a subclass of `progpy.LinearModel`). This filter uses measurement data with noise to generate a state estimate and covariance matrix.
The supported configuration parameters (keyword arguments) for UKF construction are described below:
Expand All @@ -35,20 +35,24 @@ class KalmanFilter(state_estimator.StateEstimator):
Maximum timestep for prediction in seconds. By default, the timestep dt is the difference between the last and current call of .estimate(). Some models are unstable at larger dt. Setting a smaller dt will force the model to take smaller steps; resulting in multiple prediction steps for each estimate step. Default is the parameters['dt']
e.g., dt = 1e-2
Q (list[list[float]], optional):
Kalman Process Noise Matrix
Kalman Process Noise Matrix
R (list[list[float]], optional):
Kalman Measurement Noise Matrix
"""
default_parameters = {
'alpha': 1,
'alpha': 1,
't0': -1e-10,
'dt': 1
}
}

def __init__(self, model, x0, **kwargs):
# Note: Measurement equation kept in constructor to keep it consistent with other state estimators. This way measurement equation can be provided as an ordered argument, and will just be ignored here
# Note: Measurement equation kept in constructor to keep it consistent
# with other state estimators. This way measurement equation can be
# provided as an ordered argument, and will just be ignored here
if not isinstance(model, LinearModel):
raise Exception('Kalman Filter only supports Linear Models (i.e., models derived from progpy.LinearModel)')
raise TypeError(
'Kalman Filter only supports Linear Models '
'(i.e., models derived from progpy.LinearModel)')

super().__init__(model, x0, **kwargs)

Expand All @@ -57,8 +61,8 @@ def __init__(self, model, x0, **kwargs):
if 'Q' not in self.parameters:
self.parameters['Q'] = np.diag([1.0e-3 for i in x0.keys()])
if 'R' not in self.parameters:
# Size of what's being measured (not output)
# This is determined by running the measure function on the first state
# Size of what's being measured (not output)
# This is determined by running measure() on the first state
self.parameters['R'] = np.diag([1.0e-3 for i in range(model.n_outputs)])

num_states = len(x0.keys())
Expand All @@ -67,30 +71,37 @@ def __init__(self, model, x0, **kwargs):
F = deepcopy(model.A)
B = deepcopy(model.B)
if np.size(B) == 0:
# If B is empty, replace with E.
# If B is empty, replace with E
# Append wont work if B is empty
B = deepcopy(model.E)
else:
B = np.append(B, deepcopy(model.E), 1)

self.filter = kalman.KalmanFilter(num_states, num_measurements, num_inputs)

self.__state_keys = list(x0.keys())
if isinstance(x0, dict) or isinstance(x0, model.StateContainer):
warn("Warning: Use UncertainData type if estimating filtering with uncertain data.")
self.filter.x = np.array([[x0[key]] for key in model.states]) # x0.keys()
self.filter.x = np.array([[x0[key]] for key in model.states])
self.filter.P = self.parameters['Q'] / 10
elif isinstance(x0, UncertainData):
x_mean = x0.mean
self.filter.x = np.array([[x_mean[key]] for key in model.states])

# Reorder covariance to be in same order as model.states
mapping = {i: list(x0.keys()).index(key) for i, key in enumerate(model.states)}
cov = x0.cov # Set covariance in case it has been calculated
mapped_cov = [[cov[mapping[i]][mapping[j]] for j in range(len(cov))] for i in range(len(cov))] # Set covariance based on mapping
mapping = {
i: list(x0.keys()).index(key)
for i, key in enumerate(model.states)}
# Set covariance in case it has been calculated
cov = x0.cov
# Set covariance based on mapping
mapped_cov = [
[cov[mapping[i]][mapping[j]]
for j in range(len(cov))] for i in range(len(cov))]
self.filter.P = np.array(mapped_cov)
else:
raise TypeError("TypeError: x0 initial state must be of type {{dict, UncertainData}}")
raise TypeError(
"TypeError: x0 initial state must be of type "
"{{dict, UncertainData}}")

self.filter.Q = self.parameters['Q']
self.filter.R = self.parameters['R']
Expand Down Expand Up @@ -121,7 +132,10 @@ def estimate(self, t: float, u, z, **kwargs):
"""
assert t > self.t, "New time must be greater than previous"
dt = kwargs.get('dt', self.parameters['dt'])
dt = min(t - self.t, dt) # Ensure dt is not larger than the maximum time step

# Ensure dt is not larger than the maximum time step
dt = min(t - self.t, dt)

# Create u array, ensuring order of model.inputs. And reshaping to (n,1), n can be 0.
inputs = np.array([u[key] for key in self.model.inputs]).reshape((-1,1))

Expand All @@ -136,19 +150,19 @@ def estimate(self, t: float, u, z, **kwargs):
# kalman_models is x' = Fx + Bu, where x' is the next state
# Therefore we need to add the diagnol matrix 1 to A to convert
# And A and B should be multiplied by the time step
B = np.multiply(self.filter.B, dt)
F = np.multiply(self.filter.F, dt) + np.diag([1]* self.model.n_states)
B = np.multiply(self.filter.B, dt)
F = np.multiply(self.filter.F, dt) + np.diag([1]*self.model.n_states)

# Predict
while self.t < t :
self.filter.predict(u = inputs, B = B, F = F)
while self.t < t:
self.filter.predict(u=inputs, B=B, F=F)
self.t += dt

# Create z array, ensuring order of model.outputs
outputs = np.array([z[key] for key in self.model.outputs])

# Subtract D from outputs
# This is done because progpy expects the form:
# This is done because progpy expects the form:
# z = Cx + D
# While kalman expects
# z = Cx
Expand All @@ -165,4 +179,4 @@ def x(self) -> MultivariateNormalDist:
-------
state = observer.x
"""
return MultivariateNormalDist(self.model.states, self.filter.x.ravel(), self.filter.P, _type = self.model.StateContainer)
return MultivariateNormalDist(self.model.states, self.filter.x.ravel(), self.filter.P, _type=self.model.StateContainer)
1 change: 0 additions & 1 deletion tests/test_uav_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from progpy.utils.traj_gen import geometry as geom

class TestUAVGen(unittest.TestCase):

def setUp(self):
# set stdout (so it wont print)
sys.stdout = StringIO()
Expand Down

0 comments on commit 67cf424

Please sign in to comment.