//::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** @author John Miller * @version 1.3 * @date Sun Sep 13 20:37:41 EDT 2015 * @see LICENSE (MIT style license file). */ package scalation.analytics import scalation.linalgebra.{MatrixD, VectorD} import scalation.plot.Plot import scalation.random.NormalVec //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `KalmanFilter` class is used to fit state-space models. * @see en.wikipedia.org/wiki/Kalman_filter * FIX: needs more thorough testing * @param ff the state transition matrix * @param hh the observation matrix * @param qq the process noise covariance matrix * @param rr the observation noise covariance matrix * @param bb the optional control-input matrix */ class KalmanFilter (ff: MatrixD, hh: MatrixD, qq: MatrixD, rr: MatrixD, bb: MatrixD = null) { private val MAX_ITER = 20 private val doPlot = true private val n = ff.dim1 private val _0 = VectorD (n) private val _1 = VectorD.one (n) private val ii = MatrixD.eye (n) val traj = if (doPlot) new MatrixD (MAX_ITER, n) else new MatrixD (0, 0) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Iteratively solve for 'x' using predict and update phases. * @param x0 the initial state vector * @param dt the time increment (delta t) * @param u the control vector */ def solve (x0: VectorD, dt: Double, u: VectorD = null): VectorD = { var x = x0 // initial state vector var t = 0.0 // initial time var pp = new MatrixD (n, n) for (k <- 0 until MAX_ITER) { t += dt // advance time if (doPlot) traj(k) = x ++ t // add current time t, state x to trajectory // predict x = ff * x // new predicted state if (u != null && bb != null) x += bb * u pp = ff * pp * ff.t + qq // new predicted covariance // update val v = NormalVec (_0, rr).gen // observation noise val z = hh * x + v // new observation val y = z - hh * x // measurement residual val ss = hh * pp * hh.t + rr // residual covariance val kk = pp * hh.t * ss.inverse // optimal Kalman gain x = x + kk * y // updated state estimate pp = (ii - kk * hh) * pp // updated covariance estimate } // for x } // solve } // KalmanFilter class //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `KalmanFilterTest` object is used to test the `KalmanFilter` class. * @see en.wikipedia.org/wiki/Kalman_filter * > run-main scalation.analytics.KalmanFilterTest */ object KalmanFilterTest extends App { import scalation.math.double_exp println ("KalmanFilterTest") val dt = 0.1 // time increment (delta t) val var_a = 0.5 // variance of uncontrolled acceleration a val var_z = 0.5 // variance from observation noise val ff = new MatrixD ((2, 2), 1.0, dt, 0.0, 1.0) val hh = new MatrixD ((1, 2), 1.0, 0.0) val qq = new MatrixD ((2, 2), dt~^4/4, dt~^3/2, dt~^3/2, dt~^2) * var_a val rr = new MatrixD ((1, 1), var_z) val kf = new KalmanFilter (ff, hh, qq, rr) val x0 = VectorD (0.0, 0.0) println ("solve = " + kf.solve (x0, dt)) new Plot (kf.traj.col (2), kf.traj.col (0), kf.traj.col (1)) } // KalmanFilterTest object