Source code for motrackers.sort_tracker

import numpy as np
from scipy.optimize import linear_sum_assignment
from motrackers.utils.misc import iou_xywh as iou
from motrackers.track import KFTrackSORT, KFTrack4DSORT
from motrackers.centroid_kf_tracker import CentroidKF_Tracker


[docs]def assign_tracks2detection_iou(bbox_tracks, bbox_detections, iou_threshold=0.3): """ Assigns detected bounding boxes to tracked bounding boxes using IoU as a distance metric. Args: bbox_tracks (numpy.ndarray): Bounding boxes of shape `(N, 4)` where `N` is number of objects already being tracked. bbox_detections (numpy.ndarray): Bounding boxes of shape `(M, 4)` where `M` is number of objects that are newly detected. iou_threshold (float): IOU threashold. Returns: tuple: Tuple contains the following elements in the given order: - matches (numpy.ndarray): Array of shape `(n, 2)` where `n` is number of pairs formed after matching tracks to detections. This is an array of tuples with each element as matched pair of indices`(track_index, detection_index)`. - unmatched_detections (numpy.ndarray): Array of shape `(m,)` where `m` is number of unmatched detections. - unmatched_tracks (numpy.ndarray): Array of shape `(k,)` where `k` is the number of unmatched tracks. """ if (bbox_tracks.size == 0) or (bbox_detections.size == 0): return np.empty((0, 2), dtype=int), np.arange(len(bbox_detections), dtype=int), np.empty((0,), dtype=int) if len(bbox_tracks.shape) == 1: bbox_tracks = bbox_tracks[None, :] if len(bbox_detections.shape) == 1: bbox_detections = bbox_detections[None, :] iou_matrix = np.zeros((bbox_tracks.shape[0], bbox_detections.shape[0]), dtype=np.float32) for t in range(bbox_tracks.shape[0]): for d in range(bbox_detections.shape[0]): iou_matrix[t, d] = iou(bbox_tracks[t, :], bbox_detections[d, :]) assigned_tracks, assigned_detections = linear_sum_assignment(-iou_matrix) unmatched_detections, unmatched_tracks = [], [] for d in range(bbox_detections.shape[0]): if d not in assigned_detections: unmatched_detections.append(d) for t in range(bbox_tracks.shape[0]): if t not in assigned_tracks: unmatched_tracks.append(t) # filter out matched with low IOU matches = [] for t, d in zip(assigned_tracks, assigned_detections): if iou_matrix[t, d] < iou_threshold: unmatched_detections.append(d) unmatched_tracks.append(t) else: matches.append((t, d)) if len(matches): matches = np.array(matches) else: matches = np.empty((0, 2), dtype=int) return matches, np.array(unmatched_detections), np.array(unmatched_tracks)
[docs]class SORT(CentroidKF_Tracker): """ SORT - Multi object tracker. Args: max_lost (int): Max. number of times a object is lost while tracking. tracker_output_format (str): Output format of the tracker. iou_threshold (float): Intersection over union minimum value. process_noise_scale (float or numpy.ndarray): Process noise covariance matrix of shape (3, 3) or covariance magnitude as scalar value. measurement_noise_scale (float or numpy.ndarray): Measurement noise covariance matrix of shape (1,) or covariance magnitude as scalar value. time_step (int or float): Time step for Kalman Filter. """ def __init__( self, max_lost=0, tracker_output_format='mot_challenge', iou_threshold=0.3, process_noise_scale=1.0, measurement_noise_scale=1.0, time_step=1 ): self.iou_threshold = iou_threshold super().__init__( max_lost=max_lost, tracker_output_format=tracker_output_format, process_noise_scale=process_noise_scale, measurement_noise_scale=measurement_noise_scale, time_step=time_step ) def _add_track(self, frame_id, bbox, detection_confidence, class_id, **kwargs): # self.tracks[self.next_track_id] = KFTrackSORT( # self.next_track_id, frame_id, bbox, detection_confidence, class_id=class_id, # data_output_format=self.tracker_output_format, process_noise_scale=self.process_noise_scale, # measurement_noise_scale=self.measurement_noise_scale, **kwargs # ) self.tracks[self.next_track_id] = KFTrack4DSORT( self.next_track_id, frame_id, bbox, detection_confidence, class_id=class_id, data_output_format=self.tracker_output_format, process_noise_scale=self.process_noise_scale, measurement_noise_scale=self.measurement_noise_scale, kf_time_step=1, **kwargs) self.next_track_id += 1
[docs] def update(self, bboxes, detection_scores, class_ids): self.frame_count += 1 bbox_detections = np.array(bboxes, dtype='int') # track_ids_all = list(self.tracks.keys()) # bbox_tracks = [] # track_ids = [] # for track_id in track_ids_all: # bb = self.tracks[track_id].predict() # if np.any(np.isnan(bb)): # self._remove_track(track_id) # else: # track_ids.append(track_id) # bbox_tracks.append(bb) track_ids = list(self.tracks.keys()) bbox_tracks = [] for track_id in track_ids: bb = self.tracks[track_id].predict() bbox_tracks.append(bb) bbox_tracks = np.array(bbox_tracks) if len(bboxes) == 0: for i in range(len(bbox_tracks)): track_id = track_ids[i] bbox = bbox_tracks[i, :] confidence = self.tracks[track_id].detection_confidence cid = self.tracks[track_id].class_id self._update_track(track_id, self.frame_count, bbox, detection_confidence=confidence, class_id=cid, lost=1) if self.tracks[track_id].lost > self.max_lost: self._remove_track(track_id) else: matches, unmatched_detections, unmatched_tracks = assign_tracks2detection_iou( bbox_tracks, bbox_detections, iou_threshold=0.3) for i in range(matches.shape[0]): t, d = matches[i, :] track_id = track_ids[t] bbox = bboxes[d, :] cid = class_ids[d] confidence = detection_scores[d] self._update_track(track_id, self.frame_count, bbox, confidence, cid, lost=0) for d in unmatched_detections: bbox = bboxes[d, :] cid = class_ids[d] confidence = detection_scores[d] self._add_track(self.frame_count, bbox, confidence, cid) for t in unmatched_tracks: track_id = track_ids[t] bbox = bbox_tracks[t, :] confidence = self.tracks[track_id].detection_confidence cid = self.tracks[track_id].class_id self._update_track(track_id, self.frame_count, bbox, detection_confidence=confidence, class_id=cid, lost=1) if self.tracks[track_id].lost > self.max_lost: self._remove_track(track_id) outputs = self._get_tracks(self.tracks) return outputs