Source code for motrackers.kalman_tracker

import numpy as np


[docs]class KalmanFilter: """ Kalman Filter Implementation. Args: transition_matrix (numpy.ndarray): Transition matrix of shape ``(n, n)``. measurement_matrix (numpy.ndarray): Measurement matrix of shape ``(m, n)``. control_matrix (numpy.ndarray): Control matrix of shape ``(m, n)``. process_noise_covariance (numpy.ndarray): Covariance matrix of shape ``(n, n)``. measurement_noise_covariance (numpy.ndarray): Covariance matrix of shape ``(m, m)``. prediction_covariance (numpy.ndarray): Predicted (a priori) estimate covariance of shape ``(n, n)``. initial_state (numpy.ndarray): Initial state of shape ``(n,)``. """ def __init__( self, transition_matrix, measurement_matrix, control_matrix=None, process_noise_covariance=None, measurement_noise_covariance=None, prediction_covariance=None, initial_state=None ): self.state_size = transition_matrix.shape[1] self.observation_size = measurement_matrix.shape[1] self.transition_matrix = transition_matrix self.measurement_matrix = measurement_matrix self.control_matrix = 0 if control_matrix is None else control_matrix self.process_covariance = np.eye(self.state_size) \ if process_noise_covariance is None else process_noise_covariance self.measurement_covariance = np.eye(self.observation_size) \ if measurement_noise_covariance is None else measurement_noise_covariance self.prediction_covariance = np.eye(self.state_size) if prediction_covariance is None else prediction_covariance self.x = np.zeros((self.state_size, 1)) if initial_state is None else initial_state
[docs] def predict(self, u=0): """ Prediction step of Kalman Filter. Args: u (float or int or numpy.ndarray): Control input. Default is `0`. Returns: numpy.ndarray : State vector of shape `(n,)`. """ self.x = np.dot(self.transition_matrix, self.x) + np.dot(self.control_matrix, u) self.prediction_covariance = np.dot( np.dot(self.transition_matrix, self.prediction_covariance), self.transition_matrix.T ) + self.process_covariance return self.x
[docs] def update(self, z): """ Measurement update of Kalman Filter. Args: z (numpy.ndarray): Measurement vector of the system with shape ``(m,)``. """ y = z - np.dot(self.measurement_matrix, self.x) innovation_covariance = np.dot( self.measurement_matrix, np.dot(self.prediction_covariance, self.measurement_matrix.T) ) + self.measurement_covariance optimal_kalman_gain = np.dot( np.dot(self.prediction_covariance, self.measurement_matrix.T), np.linalg.inv(innovation_covariance) ) self.x = self.x + np.dot(optimal_kalman_gain, y) eye = np.eye(self.state_size) _t1 = eye - np.dot(optimal_kalman_gain, self.measurement_matrix) t1 = np.dot(np.dot(_t1, self.prediction_covariance), _t1.T) t2 = np.dot(np.dot(optimal_kalman_gain, self.measurement_covariance), optimal_kalman_gain.T) self.prediction_covariance = t1 + t2
def get_process_covariance_matrix(dt): """ Generates a process noise covariance matrix for constant acceleration motion. Args: dt (float): Timestep. Returns: numpy.ndarray: Process covariance matrix of shape `(3, 3)`. """ # a = np.array([ # [0.25 * dt ** 4, 0.5 * dt ** 3, 0.5 * dt ** 2], # [0.5 * dt ** 3, dt ** 2, dt], # [0.5 * dt ** 2, dt, 1] # ]) a = np.array([ [dt ** 6 / 36., dt ** 5 / 24., dt ** 4 / 6.], [dt ** 5 / 24., 0.25 * dt ** 4, 0.5 * dt ** 3], [dt ** 4 / 6., 0.5 * dt ** 3, dt ** 2] ]) return a def get_transition_matrix(dt): """ Generate the transition matrix for constant acceleration motion. Args: dt (float): Timestep. Returns: numpy.ndarray: Transition matrix of shape ``(3, 3)``. """ return np.array([[1., dt, dt * dt * 0.5], [0., 1., dt], [0., 0., 1.]])
[docs]class KFTrackerConstantAcceleration(KalmanFilter): """ Kalman Filter with constant acceleration kinematic model. Args: initial_measurement (numpy.ndarray): Initial state of the tracker. time_step (float) : Time step. process_noise_scale (float): Process noise covariance scale. or covariance magnitude as scalar value. measurement_noise_scale (float): Measurement noise covariance scale. or covariance magnitude as scalar value. """ def __init__(self, initial_measurement, time_step=1, process_noise_scale=1.0, measurement_noise_scale=1.0): self.time_step = time_step measurement_size = initial_measurement.shape[0] transition_matrix = np.zeros((3 * measurement_size, 3 * measurement_size)) measurement_matrix = np.zeros((measurement_size, 3 * measurement_size)) process_noise_covariance = np.zeros((3 * measurement_size, 3 * measurement_size)) measurement_noise_covariance = np.eye(measurement_size) initial_state = np.zeros((3 * measurement_size,)) a = get_transition_matrix(self.time_step) q = get_process_covariance_matrix(self.time_step) for i in range(measurement_size): transition_matrix[3 * i:3 * i + 3, 3 * i:3 * i + 3] = a measurement_matrix[i, 3 * i] = 1. process_noise_covariance[3 * i:3 * i + 3, 3 * i:3 * i + 3] = process_noise_scale * q measurement_noise_covariance[i, i] = measurement_noise_scale initial_state[i * 3] = initial_measurement[i] prediction_noise_covariance = np.ones((3*measurement_size, 3*measurement_size)) super().__init__(transition_matrix=transition_matrix, measurement_matrix=measurement_matrix, process_noise_covariance=process_noise_covariance, measurement_noise_covariance=measurement_noise_covariance, prediction_covariance=prediction_noise_covariance, initial_state=initial_state)
[docs]class KFTracker1D(KFTrackerConstantAcceleration): def __init__(self, initial_measurement=np.array([0.]), time_step=1, process_noise_scale=1.0, measurement_noise_scale=1.0): assert initial_measurement.shape[0] == 1, initial_measurement.shape super().__init__( initial_measurement=initial_measurement, time_step=time_step, process_noise_scale=process_noise_scale, measurement_noise_scale=measurement_noise_scale )
[docs]class KFTracker2D(KFTrackerConstantAcceleration): def __init__(self, initial_measurement=np.array([0., 0.]), time_step=1, process_noise_scale=1.0, measurement_noise_scale=1.0): assert initial_measurement.shape[0] == 2, initial_measurement.shape super().__init__( initial_measurement=initial_measurement, time_step=time_step, process_noise_scale=process_noise_scale, measurement_noise_scale=measurement_noise_scale )
[docs]class KFTracker4D(KFTrackerConstantAcceleration): def __init__(self, initial_measurement=np.array([0., 0., 0., 0.]), time_step=1, process_noise_scale=1.0, measurement_noise_scale=1.0): assert initial_measurement.shape[0] == 4, initial_measurement.shape super().__init__( initial_measurement=initial_measurement, time_step=time_step, process_noise_scale=process_noise_scale, measurement_noise_scale=measurement_noise_scale )
[docs]class KFTrackerSORT(KalmanFilter): """ Kalman filter for ``SORT``. Args: bbox (numpy.ndarray): Bounding box coordinates as ``(xmid, ymid, area, aspect_ratio)``. time_step (float or int): Time step. process_noise_scale (float): Scale (a.k.a covariance) of the process noise. measurement_noise_scale (float): Scale (a.k.a. covariance) of the measurement noise. """ def __init__(self, bbox, process_noise_scale=1.0, measurement_noise_scale=1.0, time_step=1): assert bbox.shape[0] == 4, bbox.shape t = time_step transition_matrix = np.array([ [1., 0, 0, 0, t, 0, 0], [0, 1, 0, 0, 0, t, 0], [0, 0, 1, 0, 0, 0, t], [0, 0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 1]]) measurement_matrix = np.array([ [1., 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0]]) process_noise_covariance = np.array([ [1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 1, 0], [0, 0, 1, 0, 0, 0, 1], [0, 0, 0, 1, 0, 0, 0], [1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 1, 0], [0, 0, 1, 0, 0, 0, 1]]) * process_noise_scale process_noise_covariance[-1, -1] *= 0.01 process_noise_covariance[4:, 4:] *= 0.01 measurement_noise_covariance = np.eye(4) * measurement_noise_scale measurement_noise_covariance[2:, 2:] *= 0.01 prediction_covariance = np.ones_like(transition_matrix) * 10. prediction_covariance[4:, 4:] *= 100. initial_state = np.array([bbox[0], bbox[1], bbox[2], bbox[3], 0., 0., 0.]) super().__init__(transition_matrix, measurement_matrix, process_noise_covariance=process_noise_covariance, measurement_noise_covariance=measurement_noise_covariance, prediction_covariance=prediction_covariance, initial_state=initial_state)
def test_KFTracker1D(): import matplotlib.pyplot as plt def create_data(t=1000, prediction_noise=1, measurement_noise=1, non_linear_input=True, velocity_scale=1 / 200.): x = np.zeros((t,)) if non_linear_input: vel = np.array([np.sin(i * np.pi * velocity_scale) for i in range(t)]) else: vel = np.array([0.001 * i for i in range(t)]) vel_noise = vel + np.random.randn(t) * prediction_noise x_noise = np.zeros((t,)) x_measure_noise = np.random.randn(t) * measurement_noise x_noise[0] = 0. x_measure_noise[0] += x_noise[0] for i in range(t): x[i] = x[i - 1] + vel[i - 1] x_noise[i] = x[i - 1] + vel_noise[i - 1] x_measure_noise[i] += x_noise[i] return x, vel, x_noise, vel_noise, x_measure_noise t = 1000 x, vel, x_noise, vel_noise, x_measure_noise = create_data(t=t) kf = KFTracker1D( initial_measurement=np.array([x_measure_noise[0]]), process_noise_scale=1, measurement_noise_scale=1) x_prediction = [np.array([x_measure_noise[0], 0, 0])] for i in range(1, t): x_prediction.append(kf.predict()) kf.update(x_measure_noise[i]) x_prediction = np.array(x_prediction) time = np.arange(t) a = [time, x, '-', time, x_measure_noise, '--', time, x_prediction[:, 0], '-.'] plt.plot(*a) plt.legend(['true', 'noise', 'kf']) plt.xlim([0, t]) plt.grid(True) plt.show() def test_KFTracker2D(): kf = KFTracker2D(time_step=1) print('measurement matrix:') print(kf.measurement_matrix) print() print('process cov:') print(kf.process_covariance) print() print('transition matrix:') print(kf.transition_matrix) print() print('measurement cov:') print(kf.measurement_covariance) print() print('state:') print(kf.x) print() print('predicted measurement:') print(np.dot(kf.measurement_matrix, kf.x)) print() print('prediction:') print(kf.predict()) print() kf.update(np.array([1.5, 1.5])) print('prediction2:') print(kf.predict()) if __name__ == '__main__': test_KFTracker1D() test_KFTracker2D()