I am practicing Kalman filtering and wrote a short python class that uses Numpy to calculate the 2-D kalman filter for position and velocity along the X axis: assume that the object is only moving along the X-axis since it's on a flat ground.
The following code is syntactically correct and when executed looks like functionally correct. The printed distance approaches the measured distance, since in the loop, it's assumed the object is stationary.
Please have a look at the below and let me know if it's the correct implementation of a 2-D kalman filter?
note I have set process errors and observation errors to zero for now.
import numpy as np
class KalmanFilter(object):
def __init__(self, delta_t,
initial_position,
initial_velocity,
acceleration,
calc_error_in_position,
calc_error_in_velocity,
obs_error_in_position,
obs_error_in_velocity):
self.obs_Error = 0.
self.w = 0.
self.q = 0.
self.I = np.array([[1, 0], [0, 1]])
self.A = np.array([[1, delta_t], [0, 1]])
self.B = np.array([[.5 * (delta_t**2), delta_t]])
self.previous_state = np.array(
[[initial_position], [initial_velocity]])
self.predicted_state = self._calculate_predicted_state(acceleration)
self.measured_state = np.zeros((2, 1))
self.previous_P = np.array([
[calc_error_in_position**2,
(calc_error_in_position * calc_error_in_velocity)],
[(calc_error_in_position * calc_error_in_velocity),
calc_error_in_velocity**2]
])
self.R = np.array([
[obs_error_in_position**2,
(obs_error_in_position * obs_error_in_velocity)],
[(obs_error_in_position * obs_error_in_velocity), obs_error_in_velocity**2]
])
def _adjust_measured_observation(self, observed_position, observed_velocity):
obs_X = np.array([[observed_position], [observed_velocity]])
self.observed_state = self.I * obs_X + self.obs_Error
def _calculate_predicted_state(self, acceleration):
self.predicted_state = (
self.A * self.previous_state) + (self.B * acceleration) + self.w
def _calculate_predicted_covariance(self):
self.P = (self.A * self.previous_P * self.A.T) + self.q
def _update_predicted_convariance(self):
self.previous_P = np.array(
(self.I - (self.kalman_gain * self.I)) * self.P)
def _calculate_kalman_gain(self):
self.kalman_gain = (self.P * self.I.T) / \
((self.I * self.P * self.I.T) + self.R)
def _calculate_current_position(self):
self.current_state = self.previous_state + \
(self.kalman_gain * (self.observed_state - (self.I * self.previous_state)))
self.previous_state = np.array(self.current_state)
def get_current_position(self, observed_position, observed_velocity, acceleration):
self._calculate_predicted_state(acceleration)
self._calculate_predicted_covariance()
self._calculate_kalman_gain()
self._adjust_measured_observation(observed_position, observed_velocity)
self._calculate_current_position()
self._update_predicted_convariance()
return self.previous_state[0][0]
if __name__ == '__main__':
kf = KalmanFilter(1, 1.5, .5, .1, .3, .4, .3, .8)
for i in range(20):
print(kf.get_current_position(2.5, .8, .1))
-
\$\begingroup\$ You may also want to read codereview.stackexchange.com/questions/219098/… \$\endgroup\$Reinderien– Reinderien2024年12月23日 13:17:27 +00:00Commented Dec 23, 2024 at 13:17
1 Answer 1
let me know if it's the correct implementation of a 2-D kalman filter
It definitely isn't. You're exclusively using the Hadamard product through the *
operator when there are several matrix products @
needed. The rest of this review will go over implementation details without worrying that the implementation is incorrect.
Since self.I
is an identity matrix, writing I.T
is by definition pointless; this is more evidence that the implementation is incorrect.
Remove (object)
since you aren't (or, at least, shouldn't be) in Python 2 anymore.
Since this class regularly mutates its members, it can't be e.g. a NamedTuple
or frozen @dataclass
, but you can still write __slots__
so that no new members are accidentally set, and you can make several of the arrays read-only.
Add PEP484 type hints.
I
should be set from an np.eye()
call.
You have several instances of parentheses that are redundant in context due to order of operations, such as .5 * (delta_t**2)
. To de-noise the code I propose removing several of these.
Make obs_Error
lowercase.
Writing self.predicted_state = self._calculate_predicted_state(acceleration)
almost certainly violates your intent. _calculate_predicted_state
already sets self.predicted_state
, but then returns a None
that you use to overwrite the same variable!
self.measured_state
is unused so delete it.
Instead of
obs_X = np.array([[observed_position], [observed_velocity]])
self.observed_state = self.I * obs_X + self.obs_Error
write
obs_X = np.diag((observed_position, observed_velocity))
self.observed_state = obs_X + self.obs_error
Avoid writing a line continuation \
, and instead wrap a multi-line expression in parens.
Instead of
self.previous_state = np.array(self.current_state)
just call .copy()
; or better yet avoid a referential replace by assigning to [:]
.
Don't double index like self.previous_state[0][0]
. Instead write [0,0]
.
Your constructor failed to set an initial predicted_state
, kalman_gain
, P
, etc. You should do that, among other reasons because best practice is to avoid assigning new members outside of the constructor, and it takes care of all of the allocation in one shot.
import numpy as np
class KalmanFilter:
__slots__ = (
'w', 'q', 'A', 'B', 'I', 'P', 'R',
'obs_error', 'kalman_gain', 'previous_P',
'observed_state', 'current_state', 'predicted_state', 'previous_state', 'measured_state',
)
def __init__(
self,
delta_t: float,
initial_position: float,
initial_velocity: float,
acceleration: float,
calc_error_in_position: float,
calc_error_in_velocity: float,
obs_error_in_position: float,
obs_error_in_velocity: float,
) -> None:
self.obs_error = 0.
self.w = 0.
self.q = 0.
self.I = np.eye(2)
self.A = np.array((
(1, delta_t),
(0, 1),
))
self.B = np.array([(.5*delta_t**2, delta_t)])
self.current_state, self.observed_state, self.predicted_state, self.kalman_gain, self.P = np.full(
shape=(5, 2, 2), fill_value=np.nan,
)
self.previous_state = np.array((
[initial_position],
[initial_velocity],
))
self._calculate_predicted_state(acceleration)
self.previous_P = np.array((
(calc_error_in_position**2, calc_error_in_position*calc_error_in_velocity),
(calc_error_in_position*calc_error_in_velocity, calc_error_in_velocity**2),
))
self.R = np.array((
(obs_error_in_position**2, obs_error_in_position*obs_error_in_velocity),
(obs_error_in_position*obs_error_in_velocity, obs_error_in_velocity**2),
))
for array in (self.I, self.A, self.B, self.R):
array.flags.writeable = False
def _adjust_measured_observation(self, observed_position: float, observed_velocity: float) -> None:
obs_X = np.diag((observed_position, observed_velocity))
self.observed_state[:] = obs_X + self.obs_error
def _calculate_predicted_state(self, acceleration: float) -> None:
self.predicted_state[:] = (
self.A*self.previous_state + self.B*acceleration + self.w
)
def _calculate_predicted_covariance(self) -> None:
self.P[:] = self.A*self.previous_P*self.A.T + self.q
def _update_predicted_convariance(self) -> None:
self.previous_P[:] = (self.I - self.kalman_gain*self.I)*self.P
def _calculate_kalman_gain(self) -> None:
self.kalman_gain[:] = self.P*self.I.T / (
self.I*self.P*self.I.T + self.R
)
def _calculate_current_position(self) -> None:
self.current_state[:] = self.previous_state + self.kalman_gain*(
self.observed_state - self.I*self.previous_state
)
# BUG: previous_state starts as (2, 1), but this sets it to (2, 2). That's definitely wrong.
# self.previous_state[:] = self.current_state
self.previous_state = self.current_state
def get_current_position(
self, observed_position: float, observed_velocity: float, acceleration: float,
) -> float:
self._calculate_predicted_state(acceleration)
self._calculate_predicted_covariance()
self._calculate_kalman_gain()
self._adjust_measured_observation(observed_position, observed_velocity)
self._calculate_current_position()
self._update_predicted_convariance()
return self.previous_state[0,0]
def test() -> None:
kf = KalmanFilter(
delta_t=1, initial_position=1.5, initial_velocity=.5, acceleration=.1,
calc_error_in_position=.3, calc_error_in_velocity=.4,
obs_error_in_position=.3, obs_error_in_velocity=.8,
)
positions = [
kf.get_current_position(observed_position=2.5, observed_velocity=.8, acceleration=.1)
for _ in range(20)
]
expected = (
2.0 ,
2.1666666666666666,
2.25 ,
2.3 ,
2.3333333333333333,
2.3571428571428568,
2.375 ,
2.3888888888888889,
2.4 ,
2.4090909090909091,
2.4166666666666667,
2.423076923076923 ,
2.4285714285714286,
2.4333333333333333,
2.4375 ,
2.4411764705882355,
2.4444444444444444,
2.447368421052632 ,
2.45 ,
2.4523809523809526,
)
assert np.allclose(positions, expected, atol=0, rtol=1e-15)
if __name__ == '__main__':
test()