import numpy as np
[docs]class KalmanFilter:
"""
Kalman Filter Implementation.
Args:
transition_matrix (numpy.ndarray): Transition matrix of shape ``(n, n)``.
measurement_matrix (numpy.ndarray): Measurement matrix of shape ``(m, n)``.
control_matrix (numpy.ndarray): Control matrix of shape ``(m, n)``.
process_noise_covariance (numpy.ndarray): Covariance matrix of shape ``(n, n)``.
measurement_noise_covariance (numpy.ndarray): Covariance matrix of shape ``(m, m)``.
prediction_covariance (numpy.ndarray): Predicted (a priori) estimate covariance of shape ``(n, n)``.
initial_state (numpy.ndarray): Initial state of shape ``(n,)``.
"""
def __init__(
self,
transition_matrix,
measurement_matrix,
control_matrix=None,
process_noise_covariance=None,
measurement_noise_covariance=None,
prediction_covariance=None,
initial_state=None
):
self.state_size = transition_matrix.shape[1]
self.observation_size = measurement_matrix.shape[1]
self.transition_matrix = transition_matrix
self.measurement_matrix = measurement_matrix
self.control_matrix = 0 if control_matrix is None else control_matrix
self.process_covariance = np.eye(self.state_size) \
if process_noise_covariance is None else process_noise_covariance
self.measurement_covariance = np.eye(self.observation_size) \
if measurement_noise_covariance is None else measurement_noise_covariance
self.prediction_covariance = np.eye(self.state_size) if prediction_covariance is None else prediction_covariance
self.x = np.zeros((self.state_size, 1)) if initial_state is None else initial_state
[docs] def predict(self, u=0):
"""
Prediction step of Kalman Filter.
Args:
u (float or int or numpy.ndarray): Control input. Default is `0`.
Returns:
numpy.ndarray : State vector of shape `(n,)`.
"""
self.x = np.dot(self.transition_matrix, self.x) + np.dot(self.control_matrix, u)
self.prediction_covariance = np.dot(
np.dot(self.transition_matrix, self.prediction_covariance), self.transition_matrix.T
) + self.process_covariance
return self.x
[docs] def update(self, z):
"""
Measurement update of Kalman Filter.
Args:
z (numpy.ndarray): Measurement vector of the system with shape ``(m,)``.
"""
y = z - np.dot(self.measurement_matrix, self.x)
innovation_covariance = np.dot(
self.measurement_matrix, np.dot(self.prediction_covariance, self.measurement_matrix.T)
) + self.measurement_covariance
optimal_kalman_gain = np.dot(
np.dot(self.prediction_covariance, self.measurement_matrix.T),
np.linalg.inv(innovation_covariance)
)
self.x = self.x + np.dot(optimal_kalman_gain, y)
eye = np.eye(self.state_size)
_t1 = eye - np.dot(optimal_kalman_gain, self.measurement_matrix)
t1 = np.dot(np.dot(_t1, self.prediction_covariance), _t1.T)
t2 = np.dot(np.dot(optimal_kalman_gain, self.measurement_covariance), optimal_kalman_gain.T)
self.prediction_covariance = t1 + t2
def get_process_covariance_matrix(dt):
"""
Generates a process noise covariance matrix for constant acceleration motion.
Args:
dt (float): Timestep.
Returns:
numpy.ndarray: Process covariance matrix of shape `(3, 3)`.
"""
# a = np.array([
# [0.25 * dt ** 4, 0.5 * dt ** 3, 0.5 * dt ** 2],
# [0.5 * dt ** 3, dt ** 2, dt],
# [0.5 * dt ** 2, dt, 1]
# ])
a = np.array([
[dt ** 6 / 36., dt ** 5 / 24., dt ** 4 / 6.],
[dt ** 5 / 24., 0.25 * dt ** 4, 0.5 * dt ** 3],
[dt ** 4 / 6., 0.5 * dt ** 3, dt ** 2]
])
return a
def get_transition_matrix(dt):
"""
Generate the transition matrix for constant acceleration motion.
Args:
dt (float): Timestep.
Returns:
numpy.ndarray: Transition matrix of shape ``(3, 3)``.
"""
return np.array([[1., dt, dt * dt * 0.5], [0., 1., dt], [0., 0., 1.]])
[docs]class KFTrackerConstantAcceleration(KalmanFilter):
"""
Kalman Filter with constant acceleration kinematic model.
Args:
initial_measurement (numpy.ndarray): Initial state of the tracker.
time_step (float) : Time step.
process_noise_scale (float): Process noise covariance scale.
or covariance magnitude as scalar value.
measurement_noise_scale (float): Measurement noise covariance scale.
or covariance magnitude as scalar value.
"""
def __init__(self, initial_measurement, time_step=1, process_noise_scale=1.0,
measurement_noise_scale=1.0):
self.time_step = time_step
measurement_size = initial_measurement.shape[0]
transition_matrix = np.zeros((3 * measurement_size, 3 * measurement_size))
measurement_matrix = np.zeros((measurement_size, 3 * measurement_size))
process_noise_covariance = np.zeros((3 * measurement_size, 3 * measurement_size))
measurement_noise_covariance = np.eye(measurement_size)
initial_state = np.zeros((3 * measurement_size,))
a = get_transition_matrix(self.time_step)
q = get_process_covariance_matrix(self.time_step)
for i in range(measurement_size):
transition_matrix[3 * i:3 * i + 3, 3 * i:3 * i + 3] = a
measurement_matrix[i, 3 * i] = 1.
process_noise_covariance[3 * i:3 * i + 3, 3 * i:3 * i + 3] = process_noise_scale * q
measurement_noise_covariance[i, i] = measurement_noise_scale
initial_state[i * 3] = initial_measurement[i]
prediction_noise_covariance = np.ones((3*measurement_size, 3*measurement_size))
super().__init__(transition_matrix=transition_matrix, measurement_matrix=measurement_matrix,
process_noise_covariance=process_noise_covariance,
measurement_noise_covariance=measurement_noise_covariance,
prediction_covariance=prediction_noise_covariance, initial_state=initial_state)
[docs]class KFTracker1D(KFTrackerConstantAcceleration):
def __init__(self, initial_measurement=np.array([0.]), time_step=1, process_noise_scale=1.0,
measurement_noise_scale=1.0):
assert initial_measurement.shape[0] == 1, initial_measurement.shape
super().__init__(
initial_measurement=initial_measurement, time_step=time_step, process_noise_scale=process_noise_scale,
measurement_noise_scale=measurement_noise_scale
)
[docs]class KFTracker2D(KFTrackerConstantAcceleration):
def __init__(self, initial_measurement=np.array([0., 0.]), time_step=1, process_noise_scale=1.0,
measurement_noise_scale=1.0):
assert initial_measurement.shape[0] == 2, initial_measurement.shape
super().__init__(
initial_measurement=initial_measurement, time_step=time_step, process_noise_scale=process_noise_scale,
measurement_noise_scale=measurement_noise_scale
)
[docs]class KFTracker4D(KFTrackerConstantAcceleration):
def __init__(self, initial_measurement=np.array([0., 0., 0., 0.]), time_step=1, process_noise_scale=1.0,
measurement_noise_scale=1.0):
assert initial_measurement.shape[0] == 4, initial_measurement.shape
super().__init__(
initial_measurement=initial_measurement, time_step=time_step, process_noise_scale=process_noise_scale,
measurement_noise_scale=measurement_noise_scale
)
[docs]class KFTrackerSORT(KalmanFilter):
"""
Kalman filter for ``SORT``.
Args:
bbox (numpy.ndarray): Bounding box coordinates as ``(xmid, ymid, area, aspect_ratio)``.
time_step (float or int): Time step.
process_noise_scale (float): Scale (a.k.a covariance) of the process noise.
measurement_noise_scale (float): Scale (a.k.a. covariance) of the measurement noise.
"""
def __init__(self, bbox, process_noise_scale=1.0, measurement_noise_scale=1.0, time_step=1):
assert bbox.shape[0] == 4, bbox.shape
t = time_step
transition_matrix = np.array([
[1., 0, 0, 0, t, 0, 0],
[0, 1, 0, 0, 0, t, 0],
[0, 0, 1, 0, 0, 0, t],
[0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 1]])
measurement_matrix = np.array([
[1., 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0]])
process_noise_covariance = np.array([
[1, 0, 0, 0, 1, 0, 0],
[0, 1, 0, 0, 0, 1, 0],
[0, 0, 1, 0, 0, 0, 1],
[0, 0, 0, 1, 0, 0, 0],
[1, 0, 0, 0, 1, 0, 0],
[0, 1, 0, 0, 0, 1, 0],
[0, 0, 1, 0, 0, 0, 1]]) * process_noise_scale
process_noise_covariance[-1, -1] *= 0.01
process_noise_covariance[4:, 4:] *= 0.01
measurement_noise_covariance = np.eye(4) * measurement_noise_scale
measurement_noise_covariance[2:, 2:] *= 0.01
prediction_covariance = np.ones_like(transition_matrix) * 10.
prediction_covariance[4:, 4:] *= 100.
initial_state = np.array([bbox[0], bbox[1], bbox[2], bbox[3], 0., 0., 0.])
super().__init__(transition_matrix, measurement_matrix, process_noise_covariance=process_noise_covariance,
measurement_noise_covariance=measurement_noise_covariance,
prediction_covariance=prediction_covariance, initial_state=initial_state)
def test_KFTracker1D():
import matplotlib.pyplot as plt
def create_data(t=1000, prediction_noise=1, measurement_noise=1, non_linear_input=True, velocity_scale=1 / 200.):
x = np.zeros((t,))
if non_linear_input:
vel = np.array([np.sin(i * np.pi * velocity_scale) for i in range(t)])
else:
vel = np.array([0.001 * i for i in range(t)])
vel_noise = vel + np.random.randn(t) * prediction_noise
x_noise = np.zeros((t,))
x_measure_noise = np.random.randn(t) * measurement_noise
x_noise[0] = 0.
x_measure_noise[0] += x_noise[0]
for i in range(t):
x[i] = x[i - 1] + vel[i - 1]
x_noise[i] = x[i - 1] + vel_noise[i - 1]
x_measure_noise[i] += x_noise[i]
return x, vel, x_noise, vel_noise, x_measure_noise
t = 1000
x, vel, x_noise, vel_noise, x_measure_noise = create_data(t=t)
kf = KFTracker1D(
initial_measurement=np.array([x_measure_noise[0]]), process_noise_scale=1, measurement_noise_scale=1)
x_prediction = [np.array([x_measure_noise[0], 0, 0])]
for i in range(1, t):
x_prediction.append(kf.predict())
kf.update(x_measure_noise[i])
x_prediction = np.array(x_prediction)
time = np.arange(t)
a = [time, x, '-', time, x_measure_noise, '--', time, x_prediction[:, 0], '-.']
plt.plot(*a)
plt.legend(['true', 'noise', 'kf'])
plt.xlim([0, t])
plt.grid(True)
plt.show()
def test_KFTracker2D():
kf = KFTracker2D(time_step=1)
print('measurement matrix:')
print(kf.measurement_matrix)
print()
print('process cov:')
print(kf.process_covariance)
print()
print('transition matrix:')
print(kf.transition_matrix)
print()
print('measurement cov:')
print(kf.measurement_covariance)
print()
print('state:')
print(kf.x)
print()
print('predicted measurement:')
print(np.dot(kf.measurement_matrix, kf.x))
print()
print('prediction:')
print(kf.predict())
print()
kf.update(np.array([1.5, 1.5]))
print('prediction2:')
print(kf.predict())
if __name__ == '__main__':
test_KFTracker1D()
test_KFTracker2D()