Skip to content

Commit

Permalink
Port over changes
Browse files Browse the repository at this point in the history
  • Loading branch information
teubert committed Aug 17, 2023
1 parent b9e3ff5 commit f5cfec9
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 28 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ jobs:
run: pip install --upgrade --upgrade-strategy eager -e .
- name: Run tests
run: python -m tests.test_examples
test_horizon:
timeout-minutes: 5
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.7'
- name: Install dependencies cache
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: pip-cache
- name: Update
run: pip install --upgrade --upgrade-strategy eager -e .
- name: Run tests
run: python -m tests.test_horizon
test_linear_model:
timeout-minutes: 5
runs-on: ubuntu-latest
Expand Down
37 changes: 12 additions & 25 deletions examples/horizon.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,35 @@
ii) Time event is predicted to occur (with uncertainty)
"""

import numpy as np
from progpy.models.thrown_object import ThrownObject
from progpy import *
from progpy.predictors import MonteCarlo
from progpy.uncertain_data import MultivariateNormalDist
from pprint import pprint

def run_example():
# Step 1: Setup model & future loading
# Step 1: Setup model, future loading, and state
def future_loading(t, x = None):
return {}
m = ThrownObject(process_noise = 0.25, measurement_noise = 0.2)
m = ThrownObject(process_noise = 0.5, measurement_noise = 0.15)
initial_state = m.initialize()

# Step 2: Demonstrating state estimator
print("\nPerforming State Estimation Step...")

# Step 2a: Setup
NUM_SAMPLES = 1000
filt = state_estimators.ParticleFilter(m, initial_state, num_particles = NUM_SAMPLES)
# VVV Uncomment this to use UKF State Estimator VVV
# filt = state_estimators.UnscentedKalmanFilter(batt, initial_state)

# Step 2b: One step of state estimator
u = m.InputContainer({}) # No input for ThrownObject
filt.estimate(0.1, u, m.output(initial_state))

# Note: in a prognostic application the above state estimation
# step would be repeated each time there is new data.
# Here we're doing one step to demonstrate how the state estimator is used
x = MultivariateNormalDist(initial_state.keys(), initial_state.values(), np.diag([x_i*0.01 for x_i in initial_state.values()]))

# Step 3: Demonstrating Predictor
# Step 2: Demonstrating Predictor
print("\nPerforming Prediction Step...")

# Step 3a: Setup Predictor
mc = predictors.MonteCarlo(m)
# Step 2a: Setup Predictor
mc = MonteCarlo(m)

# Step 3b: Perform a prediction
# Step 2b: Perform a prediction
# THIS IS WHERE WE DIVERGE FROM THE THROWN_OBJECT_EXAMPLE
# Here we set a prediction horizon
# We're saying we are not interested in any events that occur after this time
PREDICTION_HORIZON = 7.75
samples = filt.x # Since we're using a particle filter, which is also sample-based, we can directly use the samples, without changes
PREDICTION_HORIZON = 7.7
STEP_SIZE = 0.01
mc_results = mc.predict(samples, future_loading, dt=STEP_SIZE, horizon = PREDICTION_HORIZON)
mc_results = mc.predict(x, future_loading, n_samples=NUM_SAMPLES,dt=STEP_SIZE, horizon = PREDICTION_HORIZON)
print("\nPredicted Time of Event:")
metrics = mc_results.time_of_event.metrics()
pprint(metrics) # Note this takes some time
Expand Down
23 changes: 20 additions & 3 deletions src/progpy/predictors/monte_carlo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ class MonteCarlo(Predictor):
Any additional savepoints (s) e.g., [10.1, 22.5]
"""

__DEFAULT_N_SAMPLES = 100 # Default number of samples to use, if none specified and not UncertainData

default_parameters = {
'n_samples': 100 # Default number of samples to use, if none specified
'n_samples': None
}

def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs) -> PredictionResults:
Expand All @@ -53,11 +55,20 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs)
params['print'] = False
params['progress'] = False

if not isinstance(state, UnweightedSamples) and params['n_samples'] is None:
# if not unweighted samples, some sample number is required, so set to default.
params['n_samples'] = MonteCarlo.__DEFAULT_N_SAMPLES
elif isinstance(state, UnweightedSamples) and params['n_samples'] is None:
params['n_samples'] = len(state) # number of samples is from provided state

if len(params['events']) == 0 and 'horizon' not in params:
raise ValueError("If specifying no event (i.e., simulate to time), must specify horizon")

# Sample from state if n_samples specified or state is not UnweightedSamples
if not isinstance(state, UnweightedSamples) or len(state) != params['n_samples']:
# Sample from state if n_samples specified or state is not UnweightedSamples (Case 2)
# Or if is Unweighted samples, but there are the wrong number of samples (Case 1)
if (
(isinstance(state, UnweightedSamples) and len(state) != params['n_samples']) # Case 1
or not isinstance(state, UnweightedSamples)): # Case 2
state = state.sample(params['n_samples'])

es_eqn = self.model.event_state
Expand All @@ -74,6 +85,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs)

# Perform prediction
t0 = params.get('t0', 0)
HORIZON = params.get('horizon', float('inf')) # Save the horizon to be used later
for x in state:
first_output = self.model.output(x)

Expand All @@ -82,6 +94,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs)

params['t0'] = t0
params['x'] = x
params['horizon'] = HORIZON # reset to initial horizon

if 'save_freq' in params and not isinstance(params['save_freq'], tuple):
params['save_freq'] = (params['t0'], params['save_freq'])
Expand All @@ -103,6 +116,10 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs)

# Non-vectorized prediction
while len(events_remaining) > 0: # Still events to predict
# Since horizon is relative to t0 (the simulation starting point),
# we must subtract the difference in current t0 from the initial (i.e., prediction t0)
# each subsequent simulation
params['horizon'] = HORIZON - (params['t0'] - t0)
(t, u, xi, z, es) = simulate_to_threshold(future_loading_eqn,
first_output,
threshold_keys = events_remaining,
Expand Down
6 changes: 6 additions & 0 deletions tests/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tests.test_ensemble import main as ensemble_main
from tests.test_estimate_params import main as estimate_params_main
from tests.test_examples import main as examples_main
from tests.test_horizon import main as horizon_main
from tests.test_linear_model import main as linear_main
from tests.test_metrics import main as metrics_main
from tests.test_pneumatic_valve import main as pneumatic_valve_main
Expand Down Expand Up @@ -77,6 +78,11 @@
except Exception:
was_successful = False

try:
horizon_main()
except Exception:
was_successful = False

try:
linear_main()
except Exception:
Expand Down
75 changes: 75 additions & 0 deletions tests/test_horizon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from io import StringIO
import sys
import unittest

from progpy import predictors
from progpy.models import ThrownObject

class TestHorizon(unittest.TestCase):
def setUp(self):
# set stdout (so it won't print)
sys.stdout = StringIO()

def tearDown(self):
sys.stdout = sys.__stdout__

def test_horizon_ex(self):
# Setup model
m = ThrownObject(process_noise=0.25, measurement_noise=0.2)
# Change parameters (to make simulation faster)
m.parameters['thrower_height'] = 1.0
m.parameters['throwing_speed'] = 10.0
initial_state = m.initialize()

# Define future loading (necessary for prediction call)
def future_loading(t, x=None):
return {}

# Setup Predictor (smaller sample size for efficiency)
mc = predictors.MonteCarlo(m)
mc.parameters['n_samples'] = 50

# Perform a prediction
# With this horizon, all samples will reach 'falling', but only some will reach 'impact'
PREDICTION_HORIZON = 2.127
STEP_SIZE = 0.001
mc_results = mc.predict(initial_state, future_loading, dt=STEP_SIZE, horizon = PREDICTION_HORIZON)

# 'falling' happens before the horizon is met
falling_res = [mc_results.time_of_event[iter]['falling'] for iter in range(mc.parameters['n_samples']) if mc_results.time_of_event[iter]['falling'] is not None]
self.assertEqual(len(falling_res), mc.parameters['n_samples'])

# 'impact' happens around the horizon, so some samples have reached this event and others haven't
impact_res = [mc_results.time_of_event[iter]['impact'] for iter in range(mc.parameters['n_samples']) if mc_results.time_of_event[iter]['impact'] is not None]
self.assertLess(len(impact_res), mc.parameters['n_samples'])

# Try again with very low prediction_horizon, where no events are reached
# Note: here we count how many None values there are for each event (in the above and below examples, we count values that are NOT None)
mc_results_no_event = mc.predict(initial_state, future_loading, dt=STEP_SIZE, horizon=0.3)
falling_res_no_event = [mc_results_no_event.time_of_event[iter]['falling'] for iter in range(mc.parameters['n_samples']) if mc_results_no_event.time_of_event[iter]['falling'] is None]
impact_res_no_event = [mc_results_no_event.time_of_event[iter]['impact'] for iter in range(mc.parameters['n_samples']) if mc_results_no_event.time_of_event[iter]['impact'] is None]
self.assertEqual(len(falling_res_no_event), mc.parameters['n_samples'])
self.assertEqual(len(impact_res_no_event), mc.parameters['n_samples'])

# Finally, try without horizon, all events should be reached for all samples
mc_results_all_event = mc.predict(initial_state, future_loading, dt=STEP_SIZE)
falling_res_all_event = [mc_results_all_event.time_of_event[iter]['falling'] for iter in range(mc.parameters['n_samples']) if mc_results_all_event.time_of_event[iter]['falling'] is not None]
impact_res_all_event = [mc_results_all_event.time_of_event[iter]['impact'] for iter in range(mc.parameters['n_samples']) if mc_results_all_event.time_of_event[iter]['impact'] is not None]
self.assertEqual(len(falling_res_all_event), mc.parameters['n_samples'])
self.assertEqual(len(impact_res_all_event), mc.parameters['n_samples'])

# This allows the module to be executed directly
def run_tests():
unittest.main()

def main():
load_test = unittest.TestLoader()
runner = unittest.TextTestRunner()
print("\n\nTesting Horizon functionality")
result = runner.run(load_test.loadTestsFromTestCase(TestHorizon)).wasSuccessful()

if not result:
raise Exception("Failed test")

if __name__ == '__main__':
main()

0 comments on commit f5cfec9

Please sign in to comment.