Source code for motrackers.centroid_kf_tracker

from collections import OrderedDict
import numpy as np
from scipy.spatial import distance
from scipy.optimize import linear_sum_assignment
from motrackers.tracker import Tracker
from motrackers.track import KFTrackCentroid
from motrackers.utils.misc import get_centroid


[docs]def assign_tracks2detection_centroid_distances(bbox_tracks, bbox_detections, distance_threshold=10.): """ Assigns detected bounding boxes to tracked bounding boxes using IoU as a distance metric. Args: bbox_tracks (numpy.ndarray): Tracked bounding boxes with shape `(n, 4)` and each row as `(xmin, ymin, width, height)`. bbox_detections (numpy.ndarray): detection bounding boxes with shape `(m, 4)` and each row as `(xmin, ymin, width, height)`. distance_threshold (float): Minimum distance between the tracked object and new detection to consider for assignment. Returns: tuple: Tuple containing the following elements: - matches (numpy.ndarray): Array of shape `(n, 2)` where `n` is number of pairs formed after matching tracks to detections. This is an array of tuples with each element as matched pair of indices`(track_index, detection_index)`. - unmatched_detections (numpy.ndarray): Array of shape `(m,)` where `m` is number of unmatched detections. - unmatched_tracks (numpy.ndarray): Array of shape `(k,)` where `k` is the number of unmatched tracks. """ if (bbox_tracks.size == 0) or (bbox_detections.size == 0): return np.empty((0, 2), dtype=int), np.arange(len(bbox_detections), dtype=int), np.empty((0,), dtype=int) if len(bbox_tracks.shape) == 1: bbox_tracks = bbox_tracks[None, :] if len(bbox_detections.shape) == 1: bbox_detections = bbox_detections[None, :] estimated_track_centroids = get_centroid(bbox_tracks) detection_centroids = get_centroid(bbox_detections) centroid_distances = distance.cdist(estimated_track_centroids, detection_centroids) assigned_tracks, assigned_detections = linear_sum_assignment(centroid_distances) unmatched_detections, unmatched_tracks = [], [] for d in range(bbox_detections.shape[0]): if d not in assigned_detections: unmatched_detections.append(d) for t in range(bbox_tracks.shape[0]): if t not in assigned_tracks: unmatched_tracks.append(t) # filter out matched with high distance between centroids matches = [] for t, d in zip(assigned_tracks, assigned_detections): if centroid_distances[t, d] > distance_threshold: unmatched_detections.append(d) unmatched_tracks.append(t) else: matches.append((t, d)) if len(matches): matches = np.array(matches) else: matches = np.empty((0, 2), dtype=int) return matches, np.array(unmatched_detections), np.array(unmatched_tracks)
[docs]class CentroidKF_Tracker(Tracker): """ Kalman filter based tracking of multiple detected objects. Args: max_lost (int): Maximum number of consecutive frames object was not detected. tracker_output_format (str): Output format of the tracker. process_noise_scale (float or numpy.ndarray): Process noise covariance matrix of shape (3, 3) or covariance magnitude as scalar value. measurement_noise_scale (float or numpy.ndarray): Measurement noise covariance matrix of shape (1,) or covariance magnitude as scalar value. time_step (int or float): Time step for Kalman Filter. """ def __init__( self, max_lost=1, centroid_distance_threshold=30., tracker_output_format='mot_challenge', process_noise_scale=1.0, measurement_noise_scale=1.0, time_step=1 ): self.time_step = time_step self.process_noise_scale = process_noise_scale self.measurement_noise_scale = measurement_noise_scale self.centroid_distance_threshold = centroid_distance_threshold self.kalman_trackers = OrderedDict() super().__init__(max_lost, tracker_output_format) def _add_track(self, frame_id, bbox, detection_confidence, class_id, **kwargs): self.tracks[self.next_track_id] = KFTrackCentroid( self.next_track_id, frame_id, bbox, detection_confidence, class_id=class_id, data_output_format=self.tracker_output_format, process_noise_scale=self.process_noise_scale, measurement_noise_scale=self.measurement_noise_scale, **kwargs ) self.next_track_id += 1
[docs] def update(self, bboxes, detection_scores, class_ids): self.frame_count += 1 bbox_detections = np.array(bboxes, dtype='int') track_ids = list(self.tracks.keys()) bbox_tracks = [] for track_id in track_ids: bbox_tracks.append(self.tracks[track_id].predict()) bbox_tracks = np.array(bbox_tracks) if len(bboxes) == 0: for i in range(len(bbox_tracks)): track_id = track_ids[i] bbox = bbox_tracks[i, :] confidence = self.tracks[track_id].detection_confidence cid = self.tracks[track_id].class_id self._update_track(track_id, self.frame_count, bbox, detection_confidence=confidence, class_id=cid, lost=1) if self.tracks[track_id].lost > self.max_lost: self._remove_track(track_id) else: matches, unmatched_detections, unmatched_tracks = assign_tracks2detection_centroid_distances( bbox_tracks, bbox_detections, distance_threshold=self.centroid_distance_threshold ) for i in range(matches.shape[0]): t, d = matches[i, :] track_id = track_ids[t] bbox = bboxes[d, :] cid = class_ids[d] confidence = detection_scores[d] self._update_track(track_id, self.frame_count, bbox, confidence, cid, lost=0) for d in unmatched_detections: bbox = bboxes[d, :] cid = class_ids[d] confidence = detection_scores[d] self._add_track(self.frame_count, bbox, confidence, cid) for t in unmatched_tracks: track_id = track_ids[t] bbox = bbox_tracks[t, :] confidence = self.tracks[track_id].detection_confidence cid = self.tracks[track_id].class_id self._update_track(track_id, self.frame_count, bbox, confidence, cid, lost=1) if self.tracks[track_id].lost > self.max_lost: self._remove_track(track_id) outputs = self._get_tracks(self.tracks) return outputs