Source code for d3d.tracking.filter

# TODO: Deal with rotation filtering: https://math.stackexchange.com/questions/2621677/extended-kalman-filter-equation-for-orientation-quaternion

import logging
from typing import List, Union
from warnings import warn

import filterpy.kalman as kf
import numpy as np
import numpy.linalg as npl
from d3d.abstraction import ObjectTarget3D, TrackingTarget3D
from scipy.spatial.transform import Rotation
from scipy.special import fresnel

_logger = logging.getLogger("d3d")


[docs]def is_pd(B): """Returns true when input is positive-definite, via Cholesky""" try: _ = npl.cholesky(B) return True except npl.LinAlgError: return False
[docs]def nearest_pd(A): """Find the nearest positive-definite matrix to input A Python/Numpy port of John D'Errico's `nearestSPD` MATLAB code [1], which credits [2]. - [1] https://www.mathworks.com/matlabcentral/fileexchange/42885-nearestspd - [2] N.J. Higham, "Computing a nearest symmetric positive semidefinite matrix" (1988): https://doi.org/10.1016/0024-3795(88)90223-6 """ B = (A + A.T) / 2 _, s, V = npl.svd(B, hermitian=True) H = np.dot(V.T, np.dot(np.diag(s), V)) A2 = (B + H) / 2 A3 = (A2 + A2.T) / 2 if is_pd(A3): return A3 spacing = np.spacing(npl.norm(A)) # The above is different from [1]. It appears that MATLAB's `chol` Cholesky # decomposition will accept matrixes with exactly 0-eigenvalue, whereas # Numpy's will not. So where [1] uses `eps(mineig)` (where `eps` is Matlab # for `np.spacing`), we use the above definition. CAVEAT: our `spacing` # will be much larger than [1]'s `eps(mineig)`, since `mineig` is usually on # the order of 1e-16, and `eps(1e-16)` is on the order of 1e-34, whereas # `spacing` will, for Gaussian random matrixes of small dimension, be on # othe order of 1e-16. In practice, both ways converge, as the unit test # below suggests. I = np.eye(A.shape[0]) k = 1 while not is_pd(A3): mineig = np.min(np.real(npl.eigvals(A3))) A3 += I * (-mineig * k**2 + spacing) k += 1 return A3
##### simple motion models: http://fusion.isif.org/proceedings/fusion08CD/papers/1569107835.pdf
[docs]def wrap_angle(theta: float) -> float: ''' Normalize the angle to [-pi, pi) :param theta: angle to be wrapped :return: wrapped angle ''' return (theta + np.pi) % (2*np.pi) - np.pi
[docs]def motion_CV(state: Union[np.ndarray, List[float]], dt: float) -> np.ndarray: ''' Constant Velocity model :param state: original state in format [x, y, vx, vy] :param dt: time difference after last update :return: updated state ''' state = np.copy(state) state[0] += state[2] * dt state[1] += state[3] * dt return state
def motion_CTRV(state: Union[np.ndarray, List[float]], dt: float) -> np.ndarray: raise NotImplementedError()
[docs]def motion_CTRA(state: Union[np.ndarray, List[float]], dt: float) -> np.ndarray: ''' Constant Turn-Rate and (longitudinal) Acceleration model. This model also assume that velocity is the same with heading angle. CV, CTRV can be modeled by assume value equals zero :param state: original state in format [x, y, theta, v, a, w] :param dt: time difference after last update :return: updated state ''' x, y, th, v, a, w = state nth = wrap_angle(th + w * dt) nv = v + a * dt if np.isclose(w, 0): nx = x + (nv + v)/2 * np.cos(th) * dt ny = y + (nv + v)/2 * np.sin(th) * dt else: nx = x + ( nv*w*np.sin(nth) + a*np.cos(nth) - v*w*np.sin(th) - a*np.cos(th)) / (w*w) ny = y + (-nv*w*np.cos(nth) + a*np.sin(nth) + v*w*np.cos(th) - a*np.sin(th)) / (w*w) state = np.copy(state) state[:4] = (nx, ny, nth, nv) return state
[docs]def motion_CSAA(state: np.ndarray, dt: float) -> np.ndarray: ''' Constant Steering Angle and Acceleration model. :param state: original state in format [x, y, theta, v, a, c] [0 1 2 3 4 5] :param dt: time difference after last update :return: updated state ''' x, y, th, v, a, c = state gamma1 = (c*v*v)/(4*a) + th gamma2 = c*dt*v + c*dt*dt*a - th eta = np.sqrt(2*np.pi)*v*c zeta1 = (2*a*dt + v)*np.sqrt(c/2*a*np.pi) zeta2 = v*np.sqrt(c/2*a*np.pi) sz1, cz1 = fresnel(zeta1) sz2, cz2 = fresnel(zeta2) nx = x + (eta * (np.cos(gamma1)*cz1 + np.sin(gamma1)*sz1 - np.cos(gamma1)*cz2 - np.sin(gamma1)*sz2) + 2*np.sin(gamma2)*np.sqrt(a*c) + 2*np.sin(th)*np.sqrt(a*c)) / 4*np.sqrt(a*c)*c ny = y + (eta * (-np.cos(gamma1)*sz1 + np.sin(gamma1)*cz1 - np.sin(gamma1)*cz2 - np.cos(gamma1)*sz2) + 2*np.cos(gamma2)*np.sqrt(a*c) - 2*np.sin(th)*np.sqrt(a*c)) / 4*np.sqrt(a*c)*c nth = wrap_angle(th - c*dt*dt*a/2 - c*dt*v) nv = v + a*dt state = np.copy(state) state[:4] = (nx, ny, nth, nv) return state
##### End motion models #####
[docs]class PropertyFilter: ''' Abstraction class defining the interfaces for filters on target property ''' @property def dimension(self) -> np.ndarray: ''' Current dimension estimation ''' raise NotImplementedError("This is an abstraction filter") @property def dimension_var(self) -> np.ndarray: ''' Covariance for current shape estimation ''' raise NotImplementedError("This is an abstraction filter") @property def classification(self) -> np.ndarray: ''' Current classification estimation ''' raise NotImplementedError("This is an abstraction filter") @property def classification_var(self) -> np.ndarray: ''' Covariance for current classification estimation ''' raise NotImplementedError("This is an abstraction filter")
[docs] def predict(self, dt: float): ''' Predict the current system state :param dt: Time since last update ''' raise NotImplementedError("This is an abstraction filter")
[docs] def update(self, target: Union[ObjectTarget3D, TrackingTarget3D]): ''' Correct the current system state with observation. This method should always be called after calling :meth:`predict`. :param target: New target observation ''' raise NotImplementedError("This is an abstraction filter")
[docs]class PoseFilter: ''' Abstraction class defining the interfaces for filters on target pose ''' @property def position(self) -> np.ndarray: ''' Current position estimation ''' raise NotImplementedError("This is an abstraction filter") @property def position_var(self) -> np.ndarray: ''' Covariance for current position estimation ''' raise NotImplementedError("This is an abstraction filter") @property def orientation(self) -> np.ndarray: ''' Current orientation estimation ''' raise NotImplementedError("This is an abstraction filter") @property def orientation_var(self) -> np.ndarray: ''' Covariance for current orientation estimation ''' raise NotImplementedError("This is an abstraction filter") @property def velocity(self) -> np.ndarray: ''' Current linear velocity estimation ''' raise NotImplementedError("This is an abstraction filter") @property def velocity_var(self) -> np.ndarray: ''' Covariance for current linear velocity estimation ''' raise NotImplementedError("This is an abstraction filter") @property def angular_velocity(self) -> np.ndarray: ''' Current angular velocity estimation ''' raise NotImplementedError("This is an abstraction filter") @property def angular_velocity_var(self) -> np.ndarray: ''' Covariance for current angular velocity esimation ''' raise NotImplementedError("This is an abstraction filter")
[docs] def predict(self, dt: float): ''' Predict the current system state :param dt: Time since last update ''' raise NotImplementedError("This is an abstraction filter")
[docs] def update(self, target: Union[ObjectTarget3D, TrackingTarget3D]): ''' Correct the current system state with observation. This method should always be called after calling :meth:`predict`. :param target: New target observation ''' raise NotImplementedError("This is an abstraction filter")
[docs]class Box_KF(PropertyFilter): ''' Use kalman filter (simple bayesian filter) for a box shape estimation. Use latest value for classification result ''' def __init__(self, init: Union[ObjectTarget3D, TrackingTarget3D], Q: np.ndarray = np.eye(3)): ''' :param init: initial state for the target :param Q: system process noise ''' self._filter = kf.KalmanFilter(dim_x=3, dim_z=3) # initialize matrices self._filter.F = self._filter.H = np.eye(3) self._filter.Q = np.asarray(Q).reshape(3,3) # feed input self._filter.x = init.dimension self._filter.P = init.dimension_var self._saved_tag = init.tag
[docs] def predict(self, dt): self._filter.predict()
[docs] def update(self, target): self._filter.update(target.dimension, R=target.dimension_var) # Update classification # TODO: do bayesian filtering for classification? self._saved_tag = target.tag
@property def dimension(self): return self._filter.x @property def dimension_var(self): return self._filter.P @property def classification(self): return self._saved_tag @property def classification_var(self): raise NotImplementedError()
[docs]class Pose_3DOF_UKF_CV(PoseFilter): ''' UKF using constant velocity (CV) model for pose estimation, assuming 3DoF (x, y, yaw) States: .. [x, y, vx, vy] [0 1 2 3 ] Observe: .. [x, y] [0 1] ''' def __init__(self, init: Union[ObjectTarget3D, TrackingTarget3D], Q: np.ndarray = np.eye(4)): ''' :param init: initial state for the target :param Q: system process noise ''' # create filter sigma_points = kf.JulierSigmaPoints(6) self._filter = kf.UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=None, fx=motion_CV, hx=lambda s: s[:2], points=sigma_points ) self._filter.Q = np.asarray(Q).reshape(4, 4) # feed initial state self._filter.x = np.array([init.position[0], init.position[1], 0, 0]) self._filter.P = np.copy(self._filter.Q) self._filter.P[:2,:2] = init.position_var[:2, :2] self._save_z = init.position[2] # TODO: use simple bayesian filter for z values self._save_z_var = init.position_var[2, 2] self._save_ori = init.orientation # TODO: use simple bayesian filter for orientation self._save_ori_var = init.orientation_var
[docs] def predict(self, dt): self._filter.predict(dt=dt)
[docs] def update(self, detection: ObjectTarget3D): self._save_z = detection.position[2] self._save_z_var = detection.position_var[2, 2] self._save_ori = detection.orientation self._save_ori_var = detection.orientation_var self._filter.update(detection.position[:2], R=detection.position_var[:2,:2])
@property def position(self): return np.array([self._filter.x[0], self._filter.x[1], self._save_z]) @property def position_var(self): cov = np.diag([np.inf, np.inf, self._save_z_var]) cov[:2, :2] = self._filter.P[:2, :2] return cov @property def orientation(self): return self._save_ori @property def orientation_var(self): return self._save_ori_var @property def velocity(self): return np.array([self._filter.x[2], self._filter.x[3], 0]) @property def velocity_var(self): cov = np.zeros((3, 3)) cov[:2, :2] = self._filter.P[2:4, 2:4] return cov @property def angular_velocity(self): return np.zeros(3) @property def angular_velocity_var(self): return np.zeros((3, 3))
[docs]class Pose_3DOF_UKF_CTRV(PoseFilter): ''' UKF using constant turning rate and velocity (CTRV) model for pose estimation States: .. [x, y, rz, v, w] [0 1 2 3 4] Observe: .. [x, y, rz] [0 1 2 ] ''' def __init__(self): raise NotImplementedError()
[docs]class Pose_3DOF_UKF_CTRA(PoseFilter): ''' UKF using constant turning rate and acceleration (CTRA) model for pose estimation States: .. [x, y, rz, v, a, w] [0 1 2 3 4 5] Observe: .. [x, y, rz] [0 1 2 ] ''' def _state_mean(self, sigmas, Wm): x = np.average(sigmas, axis=0, weights=Wm) s = np.average(np.sin(sigmas[:, 2]), weights=Wm) c = np.average(np.cos(sigmas[:, 2]), weights=Wm) x[2] = np.arctan2(s, c) return x def _state_diff(self, x, y): d = x - y d[2] = wrap_angle(d[2]) return d def check_valid(self, note): if np.any(np.isnan(self._filter.x)): raise ValueError("nan occurs in states! (note: %s)" % note) if not is_pd(self._filter.P): newp = nearest_pd(self._filter.P) diff = npl.norm(self._filter.P - newp) message = "Covariance matrix is not positive definite, fixed with diff %.3f! (note: %s)" % (diff, note) if diff < 10: _logger.warning(message) warn(message) else: _logger.error(message) raise RuntimeError(message) self._filter.P = newp def __init__(self, init: Union[ObjectTarget3D, TrackingTarget3D], Q: np.ndarray = np.eye(6)): ''' :param init: Initial state for the target :param Q: System process noise ''' sigma_points = kf.JulierSigmaPoints(6) self._filter = kf.UnscentedKalmanFilter(dim_x=6, dim_z=3, dt=None, fx=motion_CTRA, hx=lambda s: s[:3], points=sigma_points, x_mean_fn=self._state_mean, z_mean_fn=self._state_mean, residual_x=self._state_diff, residual_z=self._state_diff, ) self._filter.Q = np.asarray(Q).reshape(6, 6) # feed initial state yaw, pitch, roll = init.orientation.as_euler("ZYX") self._filter.x = np.array([init.position[0], init.position[1], yaw, 0, 0, 0]) self._filter.P = np.copy(self._filter.Q) self._filter.P[:2, :2] = init.position_var[:2, :2] self._filter.P[2, 2] = init.orientation_var self._save_z = init.position[2] self._save_z_var = init.position_var[2, 2] self._save_pitch = pitch # TODO: use simple bayesian filter for pitch and roll values self._save_roll = roll self.check_valid("initialize")
[docs] def predict(self, dt): self._filter.predict(dt=dt) self.check_valid("prediction")
[docs] def update(self, detection: ObjectTarget3D): yaw, pitch, roll = detection.orientation.as_euler("ZYX") self._save_z = detection.position[2] self._save_z_var = detection.position_var[2, 2] self._save_pitch = pitch self._save_roll = roll self._save_ori_var = detection.orientation_var obsv = np.array([detection.position[0], detection.position[1], yaw]) R = np.zeros((3, 3)) R[:2, :2] = detection.position_var[:2, :2] R[2, 2] = detection.orientation_var self._filter.update(obsv, R=R) self._filter.x[2] = wrap_angle(self._filter.x[2]) self.check_valid("update")
@property def position(self): return np.array([self._filter.x[0], self._filter.x[1], self._save_z]) @property def position_var(self): cov = np.diag([np.inf, np.inf, self._save_z_var]) cov[:2, :2] = self._filter.P[:2, :2] return cov @property def orientation(self): return Rotation.from_euler("ZYX", [self._filter.x[2], self._save_pitch, self._save_roll]) @property def orientation_var(self): return self._save_z_var @property def velocity(self): return np.array([ self._filter.x[3] * np.cos(self._filter.x[2]), self._filter.x[3] * np.sin(self._filter.x[2]), 0 ]) @property def velocity_var(self): # here we will return the linearized covariance cov = np.zeros((3, 3)) A = np.array([ [-self._filter.x[3] * np.cos(self._filter.x[2]), np.cos(self._filter.x[2])], [ self._filter.x[3] * np.sin(self._filter.x[2]), np.sin(self._filter.x[2])] ]) cov[:2, :2] = np.dot(A, self._filter.P[2:4, 2:4]) return cov @property def angular_velocity(self): return np.array([0, 0, self._filter.x[5]]) @property def angular_velocity_var(self): return np.diag([0, 0, self._filter.P[5, 5]])
[docs]class Pose_IMM(PoseFilter): ''' UKF using IMM model for pose estimation ''' def __init__(self): raise NotImplementedError()