{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:42.263964Z", "start_time": "2019-11-01T16:12:42.082356Z" } }, "outputs": [], "source": [ "%matplotlib inline\n", "import numpy as np\n", "import scipy as sp\n", "from scipy.linalg import expm \n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:42.504529Z", "start_time": "2019-11-01T16:12:42.317034Z" } }, "outputs": [], "source": [ "import sys\n", "sys.path.append('../../')\n", "import SpringMassDamper\n", "import dmdlab\n", "\n", "from dmdlab import DMD, DMDc, delay_embed, plot_eigs, cts_from_dst" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:42.315139Z", "start_time": "2019-11-01T16:12:42.265692Z" } }, "outputs": [], "source": [ "color = plt.rcParams['axes.prop_cycle'].by_key()['color'] # store color array" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Ex 1: Sinusoid" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we show off the importance of delay embeddings.\n", "\n", "If a real-valued operator A doesn't have sufficient dimensionality, then it cannot produce the necessary complex conjugate pair of roots required to produce oscillatory time dynamics. For example, suppose we are measuring a 1D sine wave. The DMD operator A would have only one real root. The operator must be augmented to capture the oscillation." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:43.030157Z", "start_time": "2019-11-01T16:12:42.512073Z" } }, "outputs": [], "source": [ "fig, axes = plt.subplots(1, 3, figsize=[8,2])\n", "fig.subplots_adjust(hspace=2)\n", "\n", "ts = np.linspace(0,2*np.pi, 200)\n", "data = np.sin(1e-2*ts) + 0.25*np.sin(ts)**7 # powers -> add'l freq for each extra power\n", "data = data.reshape(1,-1)\n", "\n", "# 1 - Regular DMD\n", "model = DMD.from_full(data, ts)\n", "axes[0].set_title('Zero delays')\n", "axes[0].plot(ts, model.predict_dst(ts)[0].real, c=color[0])\n", "axes[0].plot(model.orig_timesteps, model.X1[0], ls=':', c='k')\n", "axes[0].set_xlim([0,2*np.pi])\n", "\n", "# 2- Delay embed\n", "shift = 15 # shift + 1 is number of eigenvalues (2*7 + 2)\n", "ts1 = ts[shift:]\n", "model = DMD.from_full(delay_embed(data, shift), ts[shift:])\n", "axes[1].set_title('{} delays'.format(shift))\n", "axes[1].plot(ts1, model.predict_dst(ts1)[0].real, c=color[0])\n", "axes[1].plot(model.orig_timesteps, model.X1[0], ls=':', c='k')\n", "axes[1].set_xlim([0,2*np.pi])\n", "\n", "# plot long times (captures low frequency, too)\n", "axes[2].set_title('Captures low and high freq.')\n", "ts = np.linspace(ts1[0],1e2*np.pi, 1000)\n", "axes[2].plot(ts, model.predict_dst(ts)[0].real, c=color[0])\n", "\n", "# toss in an eigenvalue plot at the end\n", "plot_eigs(model.eigs, figsize=[4,3]);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Keeping with sine, let's address the addition of noise and the importance of thresholding." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:43.115006Z", "start_time": "2019-11-01T16:12:43.031747Z" } }, "outputs": [], "source": [ "fig, ax = plt.subplots(1,1, figsize=[5,3])\n", "\n", "ts = np.linspace(0,2*np.pi, 200)\n", "data = np.sin(ts) + .1*np.random.randn(len(ts))\n", "data = data.reshape(1,-1)\n", "\n", "# 2- Delay embed\n", "shift = 25\n", "ts1 = ts[shift:]\n", "model = DMD.from_full(delay_embed(data, shift), ts[shift:], threshold=2, threshold_type='count')\n", "\n", "ax.plot(ts1, model.predict_dst(ts1)[-1].real, c=color[0], label='Prediction')\n", "ax.plot(ts, data[0], ls=':', c='k', label='Data')\n", "ax.set_xlim([0,2*np.pi])\n", "ax.legend()\n", "\n", "plot_eigs(model.eigs, figsize=[5,3]);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the average DMD operator to correct for noisey errors." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dmdA_list = []\n", "data_list = []\n", "predict_list = []\n", "for aModel in range(50):\n", " ts = np.linspace(0,2*np.pi, 200)\n", " data = np.sin(ts) + .1*np.random.randn(len(ts))\n", " data_list.append(data)\n", " data = data.reshape(1,-1)\n", " \n", " # 2- Delay embed\n", " shift = 29\n", " model = DMD.from_full(delay_embed(data, shift), ts[shift:], threshold=2, threshold_type='count')\n", " predict_list.append(model.predict_dst(ts[shift:])[-1].real)\n", " dmdA_list.append(model.A)\n", "data_list = np.array(data_list)\n", "predict_list = np.array(predict_list)\n", "\n", "A = np.mean(dmdA_list,axis=0)\n", "ctsA, _ = cts_from_dst(A, np.zeros_like(A), ts[1]-ts[0])\n", "A_predict = [(expm(ctsA*t)@model.X1[:,0]).real[0] for t in ts[shift:]]\n", "\n", "fig, ax = plt.subplots(1,1, figsize=[5,3])\n", "ax.plot(ts[shift:], A_predict, c='k', label='Avg. DMD predict.')\n", "ax.fill_between(ts[shift:], np.min(predict_list, axis=0), np.max(predict_list, axis=0),\n", " alpha=0.5, color=color[0], label='DMD predict.')\n", "ax.fill_between(ts, np.min(data_list, axis=0), np.max(data_list, axis=0), \n", " alpha=0.5, color='gray', label='data')\n", "ax.legend()\n", "ax.set_xlim([0,2*np.pi])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Ex 2: Spring-Mass-Damper" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll show how DMD works with a physical system where we have intuition for forcing." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:44.190029Z", "start_time": "2019-11-01T16:12:43.118771Z" } }, "outputs": [], "source": [ "# Initialize system\n", "smd = SpringMassDamper.spring_mass_damper({'mass': 10, 'spring': 1, 'damper': 1})\n", "y0 = np.array([0, 2]) # kick it\n", "\n", "# Choose times (these are universally used throughout this section for the control pulse)\n", "t_span = [0,100]\n", "dt = .1\n", "ts = np.linspace(*t_span, 1000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, compute the transfer function for a linear state space system under forcing. We'll plot the transfer function to find interesting frequencies by inspection" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:45.382741Z", "start_time": "2019-11-01T16:12:44.192510Z" } }, "outputs": [], "source": [ "# Transfer function for a linear state space system\n", "G = lambda s: np.linalg.inv(s%(2*np.pi)*np.identity(2)-smd.A)@smd.B\n", "\n", "fig, ax = plt.subplots(1, figsize=[6,3])\n", "fig.tight_layout(rect=[0.05,0.05,.95,.95])\n", "freq = np.linspace(0, 43/7, 100)\n", "ax.plot(freq, [G(s)[1] for s in freq])\n", "ax.set_xlabel('Frequency')\n", "ax.set_ylabel('Transfer fn.')\n", "\n", "# Plot some frequencies\n", "fig, axes = plt.subplots(2,2,figsize=[6,5])\n", "fig.subplots_adjust(hspace=1)\n", "for ax, freq in zip(axes.flatten(), [0,.1,.3,2]):\n", " # Run simulation\n", " smd.set_control(ts, 2*np.sin(freq*ts))\n", " res = smd.simulate(y0, t_span, dt, True)\n", " \n", " # Plot result\n", " ax.set_title('Control Freq = {}'.format(freq))\n", " ax.plot(smd.t, smd.x)\n", " ax.plot(smd.t, smd.u(smd.t), alpha=0.5)\n", " \n", "fig.legend(['Signal', 'Control'], fontsize=14, loc='center', ncol=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's look at the SMD without any external forcing. This example will show how time delays are similar to including a measured derivative term. (From left to right, we have DMD default, DMD only measuring position, and DMD with a time delay to capture position and it's derivative)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:45.754704Z", "start_time": "2019-11-01T16:12:45.384021Z" } }, "outputs": [], "source": [ "fig, axes = plt.subplots(1,3, figsize=[9,3])\n", "fig.subplots_adjust(wspace=0)\n", "\n", "omega = 0 # no control\n", "smd.set_control(ts, 2*np.sin(omega*ts))\n", "res = smd.simulate(y0, t_span, dt, True)\n", "\n", "# Default\n", "model = DMD.from_full(res.y, res.t)\n", "axes[0].plot(model.orig_timesteps, model.predict_dst()[0].real, c=color[0])\n", "axes[0].plot(model.orig_timesteps, model.X1[0], ls=':', c='k')\n", "axes[0].set_title('Full state measurement')\n", "\n", "# Only measure x:\n", "model = DMD.from_full(res.y[0,:].reshape(1,-1), res.t)\n", "axes[1].plot(model.orig_timesteps, model.predict_dst()[0].real, c=color[0])\n", "axes[1].plot(model.orig_timesteps, model.X1[0], ls=':', c='k')\n", "axes[1].set_title('Only measure x')\n", "\n", "# Only measure x and time-delay\n", "s = 1\n", "model = DMD.from_full(delay_embed(res.y[0,:].reshape(1,-1), s), res.t[s:])\n", "axes[2].plot(model.orig_timesteps, model.predict_dst()[-2].real, c=color[0])\n", "axes[2].plot(ts, res.y[0], ls=':', c='k')\n", "axes[2].set_title('Only measure x and time delay')\n", "\n", "\n", "fig.legend(['Prediction', 'Data'], fontsize=14, loc='lower right', ncol=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's just inspect an example where the control is nonlinear. Can we capture the dynamics with regular DMD? Yes, if we time-delay enough." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:49.223481Z", "start_time": "2019-11-01T16:12:45.756430Z" } }, "outputs": [], "source": [ "smd.set_control(ts, 4*np.sin(0.1*ts)**3) # weird non-linear forcing\n", "res = smd.simulate(y0, t_span, dt, True)\n", "\n", "fig, axes = plt.subplots(1,2,figsize=[8,3])\n", "model = DMD.from_full(res.y, res.t)\n", "# Ironically, the best fit linear operator looks like the underlying dynamics.\n", "axes[0].plot(model.orig_timesteps, model.predict_dst()[-2].real, c=color[0])\n", "axes[0].plot(ts, res.y[0], ls=':', c='k')\n", "axes[0].set_title('DMD without time-delay')\n", "\n", "# Delay embed to capture extra control frequencies? Needs a bunch...\n", "# Usage note: Be sure to take the last rows of the model prediction--these use the past delays \n", "# to predict the future (as opposed to the opposite for the first rows).\n", "s = 50\n", "model = DMD.from_full(delay_embed(res.y, s), res.t[s:])\n", "axes[1].plot(model.orig_timesteps, model.predict_dst()[-2].real, c=color[0])\n", "axes[1].plot(ts, res.y[0], ls=':', c='k')\n", "axes[1].set_title('DMD with time-delay')\n", "\n", "fig.legend(['Prediction', 'Data'], fontsize=14, loc='lower right', ncol=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's have DMDc go to work on the same data. No time-delays needed and we get the underlying dynamics. Compare this result to the previous one without DMDc." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2019-11-01T16:12:49.345077Z", "start_time": "2019-11-01T16:12:49.225570Z" } }, "outputs": [], "source": [ "fig, axes = plt.subplots(1,2,figsize=[9,3])\n", "\n", "Ups = (4*np.sin(0.1*ts)**3).reshape(1,-1) # weird non-linear forcing\n", "smd.set_control(ts, Ups)\n", "res = smd.simulate(y0, t_span, dt, True)\n", "X = res.y\n", "\n", "model = DMDc.from_full(X, Ups, ts)\n", "axes[0].plot(model.orig_timesteps, model.predict_dst()[-2], c=color[0])\n", "axes[0].plot(ts, res.y[0], ls=':', c='k')\n", "axes[0].set_title('DMDc prediction with control')\n", "\n", "smd.set_control(ts, np.zeros_like(ts))\n", "res0 = smd.simulate(y0, t_span, dt, True)\n", "axes[1].plot(model.orig_timesteps, model.predict_dst(model.zero_control())[-2].real, c=color[0])\n", "axes[1].plot(res0.t, res0.y[0], ls=':', c='k')\n", "axes[1].set_title('DMDc learned intrinsic dynamics')\n", "\n", "fig.legend(['Prediction', 'Data'], fontsize=14, loc='lower right', ncol=1)\n", "print('Equal dynamics? ' + str(np.allclose(model.A, sp.linalg.expm(smd.A*dt), atol=1e-3)))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" }, "latex_envs": { "LaTeX_envs_menu_present": true, "autoclose": false, "autocomplete": true, "bibliofile": "biblio.bib", "cite_by": "apalike", "current_citInitial": 1, "eqLabelWithNumbers": true, "eqNumInitial": 1, "hotkeys": { "equation": "Ctrl-E", "itemize": "Ctrl-I" }, "labels_anchors": false, "latex_user_defs": false, "report_style_numbering": false, "user_envs_cfg": false }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": { "height": "calc(100% - 180px)", "left": "10px", "top": "150px", "width": "273.711px" }, "toc_section_display": true, "toc_window_display": true }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }