diff --git a/examples/07_State_Estimation.ipynb b/examples/07_State_Estimation.ipynb index a442adc..84cac76 100644 --- a/examples/07_State_Estimation.ipynb +++ b/examples/07_State_Estimation.ipynb @@ -6,6 +6,210 @@ "source": [ "# State Estimation" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Prognostics involves two key steps: 1) state estimation and 2) prediction. [State estimation](https://nasa.github.io/progpy/prog_algs_guide.html#prog-algs-guide:~:text=to%20toe%20prediction.-,State%20Estimation,-%23) is the process of estimating the current state of the system using sensor data and a prognostics model. The result is an estimate of the current state of the system with uncertainty. This estimate can then used by the predictor to perform prediction of future states and events. In this section, we describe state estimation and the tools within ProgPy to implement it. \n", + "\n", + "State estimation is the process of estimating the internal model state (`x`) using the inputs, outputs, and parameters of the system. This is necessary for cases where the model state isn't directly measurable (i.e. *hidden states*), or where there is sensor noise in state measurements. \n", + "\n", + "The foundation of state estimators is the estimate method. The estimate method is called with a time, inputs, and outputs. Each time estimate is called, the internal state estimate is updated. For example: `state_estimator.estimate(time, inputs, outputs)`, then the resulting state can be accessed using the property x (i.e., `state_estimator.estimate .x`).\n", + "\n", + "ProgPy includes a number of [state estimators](https://nasa.github.io/progpy/api_ref/prog_algs/StateEstimator.html). The most common techniques include Kalman Filters and Particle Filters. Users can also define their own custom state estimators. Examples of each of these are presented below. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Kalman Filter" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "One method for state estimation in ProgPy is using a [Kalman Filter (KF)](https://nasa.github.io/progpy/api_ref/prog_algs/StateEstimator.html#:~:text=Unscented%20Kalman%20Filter-,Kalman,-Filter). Kalman Filters are a simple, efficient state estimator for linear systems where state is represented by a mean and covariance matrix. The resulting state is represented by a [progpy.uncertain_data.MultivariateNormalDist](https://nasa.github.io/progpy/api_ref/progpy/UncertainData.html#progpy.uncertain_data.MultivariateNormalDist). Only works with Prognostic Models inheriting from [progpy.LinearModel](https://nasa.github.io/progpy/api_ref/progpy/LinearModel.html#progpy.LinearModel). " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To illustrate how to use a Kalman Filter for state estimation, we'll use a linear version of the ThrownObject model, and use the KF State estimator with fake data to estimate state.\n", + "\n", + "First, the necessary imports." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "from progpy.models.thrown_object import LinearThrownObject\n", + "from progpy.state_estimators import KalmanFilter" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's instantiate the model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "m = LinearThrownObject()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To instantiate the Kalman filter, we need an initial (i.e. starting) state. We'll define this as slightly off of the actual state, so first we'll print the acutal state in the model for our information. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print('Initial thrower height:', m.parameters['thrower_height'])\n", + "print('Initial speed:', m.parameters['throwing_speed'])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Given this, let's define our starting state for estimation. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "x_guess = m.StateContainer({'x': 1.75, 'v': 35}) # Slightly off of true x0" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can instantiate our Kalman filter. The Kalman filter requires two arguments, the prognostics model to be used in state estimation and an initial starting state. See [Kalman Filter Docs](https://nasa.github.io/progpy/api_ref/progpy/StateEstimator.html#progpy.state_estimators.KalmanFilter) for a full description of supported arguments." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "kf = KalmanFilter(m, x_guess)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With this, we're ready to run the Kalman Filter state estimation. In the following, we'll use simulated data from the ThrownObject model. In a real application, we would be using sensor data from the system. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First, define the time step and pick a print frequency. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dt = 0.01 # time step (s)\n", + "print_freq = 50 # print every 50th iteration" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, initialize the state and input. Note that there is no input for this model, and thus it is defined as an empty InputContainer." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "x = m.initialize() # Initial state\n", + "u = m.InputContainer({}) # Initial input" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now let's run the Kalman filter. For each iteration, we'll first get the simulated output. (Note: In a real application, this would be a measured value.) Then we'll esimate the new state by calling the `estimate` method of the Kalman filter class, which takes input of the current timestamp, current input, and current output. The estimated state can then be accessed, and we print a comparison. Finally, we'll update the state, `x`. \n", + "\n", + "To visualize, we'll plot the error (i.e. the absolute value of the difference between the estimated state and the true state) throughout time. Notice that the error decreases as we progress through time. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig, (ax1, ax2) = plt.subplots(2) # Initialize figure\n", + "\n", + "for i in range(500):\n", + "\n", + " # Get simulated output (would be measured values in a real application)\n", + " z = m.output(x)\n", + "\n", + " # Estimate new state\n", + " kf.estimate(i*dt, u, z)\n", + " x_est = kf.x.mean\n", + "\n", + " # Print results \n", + " if i%print_freq == 0: # Print every print_freq'th iteration\n", + " print(f\"t: {i*dt:.2f}\\n\\tEstimate: {x_est}\\n\\tTruth: {x}\")\n", + " diff = {key: abs(x_est[key] - x[key]) for key in x.keys()}\n", + " print(f\"\\t Diff: {diff}\")\n", + " \n", + " ax1.plot(i*dt, diff['x'], '-ob')\n", + " ax2.plot(i*dt, diff['v'], '-*r')\n", + " ax1.set(xlabel='Time', ylabel='Error in x')\n", + " ax2.set(xlabel='Time', ylabel='Error in y')\n", + "\n", + " # Update real state for next step\n", + " x = m.next_state(x, u, dt)\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With this, we have illustrated how to use a built-in Kalman filter to perform state estimation. Next, we'll show how to create a new, custom state estimator. " + ] } ], "metadata": { @@ -15,8 +219,16 @@ "name": "python3" }, "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", "name": "python", - "version": "3.11.0" + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" }, "orig_nbformat": 4, "vscode": { diff --git a/examples/kalman_filter.py b/examples/kalman_filter.py deleted file mode 100644 index 2c6ff9c..0000000 --- a/examples/kalman_filter.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright © 2021 United States Government as represented by the Administrator of the National Aeronautics and Space Administration. All Rights Reserved. - -""" -This example demonstrates use of the Kalman Filter State Estimator with a LinearModel. - -First, a linear model is defined. Then the KF State estimator is used with fake data to estimate state. -""" - -import numpy as np -from progpy import LinearModel -from progpy.state_estimators import KalmanFilter - - -# Linear Model for an object thrown into the air -class ThrownObject(LinearModel): - """ - Model that similates an object thrown into the air without air resistance - - Events (2) - | falling: The object is falling - | impact: The object has hit the ground - - Inputs/Loading: (0) - - States: (2) - | x: Position in space (m) - | v: Velocity in space (m/s) - - Outputs/Measurements: (1) - | x: Position in space (m) - - Keyword Args - ------------ - process_noise : Optional, float or Dict[Srt, float] - Process noise (applied at dx/next_state). - Can be number (e.g., .2) applied to every state, a dictionary of values for each - state (e.g., {'x1': 0.2, 'x2': 0.3}), or a function (x) -> x - process_noise_dist : Optional, String - distribution for process noise (e.g., normal, uniform, triangular) - measurement_noise : Optional, float or Dict[Srt, float] - Measurement noise (applied in output eqn). - Can be number (e.g., .2) applied to every output, a dictionary of values for each - output (e.g., {'z1': 0.2, 'z2': 0.3}), or a function (z) -> z - measurement_noise_dist : Optional, String - distribution for measurement noise (e.g., normal, uniform, triangular) - g : Optional, float - Acceleration due to gravity (m/s^2). Default is 9.81 m/s^2 (standard gravity) - thrower_height : Optional, float - Height of the thrower (m). Default is 1.83 m - throwing_speed : Optional, float - Speed at which the ball is thrown (m/s). Default is 40 m/s - """ - - inputs = [] # no inputs, no way to control - states = [ - 'x', # Position (m) - 'v' # Velocity (m/s) - ] - outputs = [ - 'x' # Position (m) - ] - events = [ - 'impact' # Event- object has impacted ground - ] - - A = np.array([[0, 1], [0, 0]]) - E = np.array([[0], [-9.81]]) - C = np.array([[1, 0]]) - F = None # Will override method - - # The Default parameters. - # Overwritten by passing parameters dictionary into constructor - default_parameters = { - 'thrower_height': 1.83, # m - 'throwing_speed': 40, # m/s - 'g': -9.81 # Acceleration due to gravity in m/s^2 - } - - def initialize(self, u=None, z=None): - return self.StateContainer({ - 'x': self.parameters['thrower_height'], - # Thrown, so initial altitude is height of thrower - 'v': self.parameters['throwing_speed'] - # Velocity at which the ball is thrown - this guy is a professional baseball pitcher - }) - - # This is actually optional. Leaving thresholds_met empty will use the event state to define thresholds. - # Threshold is met when Event State == 0. - # However, this implementation is more efficient, so we included it - def threshold_met(self, x): - return { - 'falling': x['v'] < 0, - 'impact': x['x'] <= 0 - } - - def event_state(self, x): - x_max = x['x'] + np.square(x['v'])/(-self.parameters['g']*2) # Use speed and position to estimate maximum height - return { - 'falling': np.maximum(x['v']/self.parameters['throwing_speed'],0), # Throwing speed is max speed - 'impact': np.maximum(x['x']/x_max,0) if x['v'] < 0 else 1 # 1 until falling begins, then it's fraction of height - } - -def run_example(): - # Step 1: Instantiate the model - m = ThrownObject(process_noise = 0, measurement_noise = 0) - - # Step 2: Instantiate the Kalman Filter State Estimator - # Define the initial state to be slightly off of actual - x_guess = m.StateContainer({'x': 1.75, 'v': 35}) # Guess of initial state - # Note: actual is {'x': 1.83, 'v': 40} - kf = KalmanFilter(m, x_guess) - - # Step 3: Run the Kalman Filter State Estimator - # Here we're using simulated data from the thrown_object. - # In a real application you would be using sensor data from the system - dt = 0.01 # Time step (s) - print_freq = 50 # Print every print_freq'th iteration - x = m.initialize() - u = m.InputContainer({}) # No input for this model - - for i in range(500): - # Get simulated output (would be measured in a real application) - z = m.output(x) - - # Estimate New State - kf.estimate(i*dt, u, z) - x_est = kf.x.mean - - # Print Results - if i%print_freq == 0: # Print every print_freq'th iteration - print(f"t: {i*dt:.2f}\n\tEstimate: {x_est}\n\tTruth: {x}") - diff = {key: x_est[key] - x[key] for key in x.keys()} - print(f"\t Diff: {diff}") - - # Update Real state for next step - x = m.next_state(x, u, dt) - -if __name__ == '__main__': - run_example()