# -*- coding: utf-8 -*-
'''
File name : kalman_filter.py
File Description : Kalman Filter Algorithm Implementation
Author : Srini Ananthakrishnan
Date created : 07/14/2017
Date last modified: 07/16/2017
Python Version : 3.6
Fyi : https://github.com/srianant/kalman_filter_multi_object_tracking
'''
# Import python libraries
import numpy as np
#from kalman_filter import KalmanFilter
#from common import dprint
from scipy.optimize import linear_sum_assignment
class KalmanFilter(object):
"""
Reference: https://en.wikipedia.org/wiki/Kalman_filter
Attributes: None
"""
def __init__(self):
"""Initialize variable used by Kalman Filter class
Args:
None
Return:
None
"""
self.dt = 0.005 # delta time
self.A = np.array([[1, 0], [0, 1]]) # matrix in observation equations
self.u = np.zeros((2, 1)) # previous state vector
# (x,y) tracking object center
self.b = np.array([[0], [255]]) # vector of observations
self.P = np.diag((3.0, 3.0)) # covariance matrix
self.F = np.array([[1.0, self.dt], [0.0, 1.0]]) # state transition mat
self.Q = np.eye(self.u.shape[0]) # process noise matrix
self.R = np.eye(self.b.shape[0]) # observation noise matrix
self.lastResult = np.array([[0], [255]])
def predict(self):
"""Predict state vector u and variance of uncertainty P (covariance).
where,
u: previous state vector
P: previous covariance matrix
F: state transition matrix
Q: process noise matrix
Equations:
u'_{k|k-1} = Fu'_{k-1|k-1}
P_{k|k-1} = FP_{k-1|k-1} F.T + Q
where,
F.T is F transpose
Args:
None
Return:
vector of predicted state estimate
"""
# Predicted state estimate
self.u = np.round(np.dot(self.F, self.u))
# Predicted estimate covariance
self.P = np.dot(self.F, np.dot(self.P, self.F.T)) + self.Q
self.lastResult = self.u # same last predicted result
return self.u
def correct(self, b, flag):
"""Correct or update state vector u and variance of uncertainty P (covariance).
where,
u: predicted state vector u
A: matrix in observation equations
b: vector of observations
P: predicted covariance matrix
Q: process noise matrix
R: observation noise matrix
Equations:
C = AP_{k|k-1} A.T + R
K_{k} = P_{k|k-1} A.T(C.Inv)
u'_{k|k} = u'_{k|k-1} + K_{k}(b_{k} - Au'_{k|k-1})
P_{k|k} = P_{k|k-1} - K_{k}(CK.T)
where,
A.T is A transpose
C.Inv is C inverse
Args:
b: vector of observations
flag: if "true" prediction result will be updated else detection
Return:
predicted state vector u
"""
if not flag: # update using prediction
self.b = self.lastResult
else: # update using detection
self.b = b
C = np.dot(self.A, np.dot(self.P, self.A.T)) + self.R
K = np.dot(self.P, np.dot(self.A.T, np.linalg.inv(C)))
self.u = np.round(self.u + np.dot(K, (self.b - np.dot(self.A,
self.u))))
self.P = self.P - np.dot(K, np.dot(C, K.T))
self.lastResult = self.u
return self.u
class Track(object):
"""Track class for every object to be tracked
Attributes:
None
"""
def __init__(self, prediction, trackIdCount):
"""Initialize variables used by Track class
Args:
prediction: predicted centroids of object to be tracked
trackIdCount: identification of each track object
Return:
None
"""
self.track_id = trackIdCount # identification of each track object
self.KF = KalmanFilter() # KF instance to track this object
self.prediction = np.asarray(prediction) # predicted centroids (x,y)
self.skipped_frames = 0 # number of frames skipped undetected
self.trace = [] # trace path
self.my = []
class Tracker(object):
"""Tracker class that updates track vectors of object tracked
Attributes:
None
"""
def __init__(self, dist_thresh, max_frames_to_skip, max_trace_length,
trackIdCount):
"""Initialize variable used by Tracker class
Args:
dist_thresh: distance threshold. When exceeds the threshold,
track will be deleted and new track is created
max_frames_to_skip: maximum allowed frames to be skipped for
the track object undetected
max_trace_lenght: trace path history length
trackIdCount: identification of each track object
Return:
None
"""
self.dist_thresh = dist_thresh
self.max_frames_to_skip = max_frames_to_skip
self.max_trace_length = max_trace_length
self.tracks = []
self.trackIdCount = trackIdCount
def Update(self, detections):
"""Update tracks vector using following steps:
- Create tracks if no tracks vector found
- Calculate cost using sum of square distance
between predicted vs detected centroids
- Using Hungarian Algorithm assign the correct
detected measurements to predicted tracks
https://en.wikipedia.org/wiki/Hungarian_algorithm
- Identify tracks with no assignment, if any
- If tracks are not detected for long time, remove them
- Now look for un_assigned detects
- Start new tracks
- Update KalmanFilter state, lastResults and tracks trace
Args:
detections: detected centroids of object to be tracked
Return:
None
"""
# Create tracks if no tracks vector found
if (len(self.tracks) == 0):
for i in range(len(detections)):
track = Track(detections[i], self.trackIdCount)
self.trackIdCount += 1
self.tracks.append(track)
# Calculate cost using sum of square distance between
# predicted vs detected centroids
N = len(self.tracks)
M = len(detections)
cost = np.zeros(shape=(N, M)) # Cost matrix
for i in range(len(self.tracks)):
for j in range(len(detections)):
try:
diff = self.tracks[i].prediction - detections[j]
distance = np.sqrt(diff[0][0]*diff[0][0] +
diff[1][0]*diff[1][0])
cost[i][j] = distance
except:
pass
# Let's average the squared ERROR
cost = (0.5) * cost
# Using Hungarian Algorithm assign the correct detected measurements
# to predicted tracks
assignment = []
for _ in range(N):
assignment.append(-1)
row_ind, col_ind = linear_sum_assignment(cost)
for i in range(len(row_ind)):
assignment[row_ind[i]] = col_ind[i]
# Identify tracks with no assignment, if any
un_assigned_tracks = []
for i in range(len(assignment)):
if (assignment[i] != -1):
# check for cost distance threshold.
# If cost is very high then un_assign (delete) the track
if (cost[i][assignment[i]] > self.dist_thresh
- 1
- 2
- 3
- 4
- 5
- 6
前往页