Skip to content

Commit

Permalink
Merge pull request #78 from nasa/feature/empty_future_loading
Browse files Browse the repository at this point in the history
Make future loading optional in prediction
  • Loading branch information
teubert authored Aug 18, 2023
2 parents 670e1cf + a34172e commit ff234f0
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 40 deletions.
7 changes: 2 additions & 5 deletions examples/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
def run_example():
# Step 1: Setup model & future loading
m = ThrownObject(process_noise = 1)
def future_loading(t, x = None):
# No load for a thrown object
return m.InputContainer({})
initial_state = m.initialize()

# Step 2: Demonstrating state estimator
Expand All @@ -42,7 +39,7 @@ def future_loading(t, x = None):
# Step 2c: Perform state estimation step, given some measurement, above what's expected
example_measurements = m.OutputContainer({'x': 7.5})
t = 0.1
u = future_loading(t)
u = m.InputContainer({})
filt.estimate(t, u, example_measurements) # Update state, given (example) sensor data

# Step 2d: Print & Plot Resulting Posterior State
Expand All @@ -65,7 +62,7 @@ def future_loading(t, x = None):
# Step 3b: Perform a prediction
NUM_SAMPLES = 50
STEP_SIZE = 0.01
mc_results = mc.predict(filt.x, future_loading, n_samples = NUM_SAMPLES, dt=STEP_SIZE, save_freq=STEP_SIZE)
mc_results = mc.predict(filt.x, n_samples = NUM_SAMPLES, dt=STEP_SIZE, save_freq=STEP_SIZE)
print('Predicted time of event (ToE): ', mc_results.time_of_event.mean)
# Here there are 2 events predicted, when the object starts falling, and when it impacts the ground.

Expand Down
6 changes: 2 additions & 4 deletions examples/horizon.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

def run_example():
# Step 1: Setup model & future loading
def future_loading(t, x = None):
return {}
m = ThrownObject(process_noise = 0.25, measurement_noise = 0.2)
m = ThrownObject(process_noise=0.25, measurement_noise=0.2)
initial_state = m.initialize()

# Step 2: Demonstrating state estimator
Expand Down Expand Up @@ -53,7 +51,7 @@ def future_loading(t, x = None):
PREDICTION_HORIZON = 7.75
samples = filt.x # Since we're using a particle filter, which is also sample-based, we can directly use the samples, without changes
STEP_SIZE = 0.01
mc_results = mc.predict(samples, future_loading, dt=STEP_SIZE, horizon = PREDICTION_HORIZON)
mc_results = mc.predict(samples, dt=STEP_SIZE, horizon=PREDICTION_HORIZON)
print("\nPredicted Time of Event:")
metrics = mc_results.time_of_event.metrics()
pprint(metrics) # Note this takes some time
Expand Down
4 changes: 1 addition & 3 deletions examples/predict_specific_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ def run_example():
m = ThrownObject()
initial_state = m.initialize()
load = m.InputContainer({}) # Optimization - create once
def future_loading(t, x = None):
return load

## State Estimation - perform a single ukf state estimate step
filt = state_estimators.UnscentedKalmanFilter(m, initial_state)
Expand All @@ -24,7 +22,7 @@ def future_loading(t, x = None):
pred = predictors.UnscentedTransformPredictor(m)

# Predict with a step size of 0.1
mc_results = pred.predict(filt.x, future_loading, dt=0.1, save_freq= 1, events=['impact'])
mc_results = pred.predict(filt.x, dt=0.1, save_freq= 1, events=['impact'])

# Print Results
for i, time in enumerate(mc_results.times):
Expand Down
14 changes: 5 additions & 9 deletions examples/sensitivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,18 @@ def run_example():
# Step 1: Create instance of model
m = ThrownObject()

# Step 2: Setup for simulation
def future_load(t, x=None):
return m.InputContainer({})

# Step 3: Setup range on parameters considered
# Step 2: Setup range on parameters considered
thrower_height_range = np.arange(1.2, 2.1, 0.1)

# Step 4: Sim for each
# Step 3: Sim for each
event = 'impact'
eods = np.empty(len(thrower_height_range))
for (i, thrower_height) in zip(range(len(thrower_height_range)), thrower_height_range):
m.parameters['thrower_height'] = thrower_height
simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt =1e-3, save_freq =10)
simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt =1e-3, save_freq =10)
eods[i] = simulated_results.times[-1]

# Step 5: Analysis
# Step 4: Analysis
print('For a reasonable range of heights, impact time is between {} and {}'.format(round(eods[0],3), round(eods[-1],3)))
sensitivity = (eods[-1]-eods[0])/(thrower_height_range[-1] - thrower_height_range[0])
print(' - Average sensitivity: {} s per cm height'.format(round(sensitivity/100, 6)))
Expand All @@ -40,7 +36,7 @@ def future_load(t, x=None):
eods = np.empty(len(throw_speed_range))
for (i, throw_speed) in zip(range(len(throw_speed_range)), throw_speed_range):
m.parameters['throwing_speed'] = throw_speed
simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], options={'dt':1e-3, 'save_freq':10})
simulated_results = m.simulate_to_threshold(threshold_keys=[event], options={'dt':1e-3, 'save_freq':10})
eods[i] = simulated_results.times[-1]

print('\nFor a reasonable range of throwing speeds, impact time is between {} and {}'.format(round(eods[0],3), round(eods[-1],3)))
Expand Down
12 changes: 4 additions & 8 deletions examples/state_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ def run_example():
# Step 1: Create instance of model (without drag)
m = ThrownObject( cd = 0 )

# Step 2: Setup for simulation
def future_load(t, x=None):
return {}

# add state limits
# Step 2: add state limits
m.state_limits = {
# object may not go below ground height
'x': (0, inf),
Expand All @@ -30,7 +26,7 @@ def future_load(t, x=None):

# Step 3: Simulate to impact
event = 'impact'
simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt=0.005, save_freq=1)
simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt=0.005, save_freq=1)

# Print states
print('Example 1')
Expand All @@ -42,7 +38,7 @@ def future_load(t, x=None):
x0 = m.initialize(u = {}, z = {})
x0['x'] = -1

simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt=0.005, save_freq=1, x = x0)
simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt=0.005, save_freq=1, x=x0)

# Print states
print('Example 2')
Expand All @@ -57,7 +53,7 @@ def future_load(t, x=None):
m.parameters['g'] = -50000000

print('Example 3')
simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt=0.005, save_freq=0.3, x = x0, print = True, progress = False)
simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt=0.005, save_freq=0.3, x=x0, print=True, progress=False)

# Note that the limits can also be applied manually using the apply_limits function
print('limiting states')
Expand Down
7 changes: 5 additions & 2 deletions src/progpy/predictors/monte_carlo.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ class MonteCarlo(Predictor):
'n_samples': None
}

def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs) -> PredictionResults:
def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **kwargs) -> PredictionResults:
"""
Perform a single prediction
Parameters
----------
state : UncertainData
Distribution representing current state of the system
future_loading_eqn : function (t, x) -> z
future_loading_eqn : function (t, x=None) -> z, optional
Function to generate an estimate of loading at future time t, and state x
Keyword Arguments
Expand Down Expand Up @@ -77,6 +77,9 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs)
else:
raise TypeError("state must be UncertainData, dict, or StateContainer")

if future_loading_eqn is None:
future_loading_eqn = lambda t, x=None: self.model.InputContainer({})

params = deepcopy(self.parameters) # copy parameters
params.update(kwargs) # update for specific run
params['print'] = False
Expand Down
7 changes: 5 additions & 2 deletions src/progpy/predictors/unscented_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ def state_transition(x, dt):
self.filter = kalman.UnscentedKalmanFilter(num_states, num_measurements, self.parameters['dt'], measure, state_transition, self.sigma_points)
self.filter.Q = self.parameters['Q']

def predict(self, state, future_loading_eqn: Callable, **kwargs) -> PredictionResults:
def predict(self, state, future_loading_eqn: Callable = None, **kwargs) -> PredictionResults:
"""
Perform a single prediction
Parameters
----------
state (UncertaintData): Distribution of states
future_loading_eqn : function (t, x={}) -> z
future_loading_eqn: function (t, x=None) -> z, optional
Function to generate an estimate of loading at future time t
options (optional, kwargs): configuration options\n
Any additional configuration values. Note: These parameters can also be specified in the predictor constructor. The following configuration parameters are supported: \n
Expand Down Expand Up @@ -169,6 +169,9 @@ def predict(self, state, future_loading_eqn: Callable, **kwargs) -> PredictionRe
else:
raise TypeError("state must be UncertainData, dict, or StateContainer")

if future_loading_eqn is None:
future_loading_eqn = lambda t, x=None: self.model.InputContainer({})

params = deepcopy(self.parameters) # copy parameters
params.update(kwargs) # update for specific run

Expand Down
12 changes: 5 additions & 7 deletions tests/test_predictors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ def test_UTP_ThrownObject(self):
m = ThrownObject()
pred = UnscentedTransformPredictor(m)
samples = MultivariateNormalDist(['x', 'v'], [1.83, 40], [[0.1, 0.01], [0.01, 0.1]])
def future_loading(t, x={}):
return {}

mc_results = pred.predict(samples, future_loading, dt=0.01, save_freq=1)
# No future loading (i.e., no load)
mc_results = pred.predict(samples, dt=0.01, save_freq=1)
self.assertAlmostEqual(mc_results.time_of_event.mean['impact'], 8.21, 0)
self.assertAlmostEqual(mc_results.time_of_event.mean['falling'], 4.15, 0)
# self.assertAlmostEqual(mc_results.times[-1], 9, 1) # Saving every second, last time should be around the 1s after impact event (because one of the sigma points fails afterwards)
Expand Down Expand Up @@ -126,10 +125,9 @@ def future_loading(t, x=None):
def test_MC(self):
m = ThrownObject()
mc = MonteCarlo(m)
def future_loading(t=None, x=None):
return {}

mc.predict(m.initialize(), future_loading, dt=0.2, num_samples=3, save_freq=1)

# Test with empty future loading (i.e., no load)
mc.predict(m.initialize(), dt=0.2, num_samples=3, save_freq=1)

def test_prediction_mvnormaldist(self):
times = list(range(10))
Expand Down

0 comments on commit ff234f0

Please sign in to comment.