6
\$\begingroup\$

I am practicing Kalman filtering and wrote a short python class that uses Numpy to calculate the 2-D kalman filter for position and velocity along the X axis: assume that the object is only moving along the X-axis since it's on a flat ground.

The following code is syntactically correct and when executed looks like functionally correct. The printed distance approaches the measured distance, since in the loop, it's assumed the object is stationary.

Please have a look at the below and let me know if it's the correct implementation of a 2-D kalman filter?

note I have set process errors and observation errors to zero for now.

import numpy as np
class KalmanFilter(object):
 def __init__(self, delta_t,
 initial_position,
 initial_velocity,
 acceleration,
 calc_error_in_position,
 calc_error_in_velocity,
 obs_error_in_position,
 obs_error_in_velocity):
 self.obs_Error = 0.
 self.w = 0.
 self.q = 0.
 self.I = np.array([[1, 0], [0, 1]])
 self.A = np.array([[1, delta_t], [0, 1]])
 self.B = np.array([[.5 * (delta_t**2), delta_t]])
 self.previous_state = np.array(
 [[initial_position], [initial_velocity]])
 self.predicted_state = self._calculate_predicted_state(acceleration)
 self.measured_state = np.zeros((2, 1))
 self.previous_P = np.array([
 [calc_error_in_position**2,
 (calc_error_in_position * calc_error_in_velocity)],
 [(calc_error_in_position * calc_error_in_velocity),
 calc_error_in_velocity**2]
 ])
 self.R = np.array([
 [obs_error_in_position**2,
 (obs_error_in_position * obs_error_in_velocity)],
 [(obs_error_in_position * obs_error_in_velocity), obs_error_in_velocity**2]
 ])
 def _adjust_measured_observation(self, observed_position, observed_velocity):
 obs_X = np.array([[observed_position], [observed_velocity]])
 self.observed_state = self.I * obs_X + self.obs_Error
 def _calculate_predicted_state(self, acceleration):
 self.predicted_state = (
 self.A * self.previous_state) + (self.B * acceleration) + self.w
 def _calculate_predicted_covariance(self):
 self.P = (self.A * self.previous_P * self.A.T) + self.q
 def _update_predicted_convariance(self):
 self.previous_P = np.array(
 (self.I - (self.kalman_gain * self.I)) * self.P)
 def _calculate_kalman_gain(self):
 self.kalman_gain = (self.P * self.I.T) / \
 ((self.I * self.P * self.I.T) + self.R)
 def _calculate_current_position(self):
 self.current_state = self.previous_state + \
 (self.kalman_gain * (self.observed_state - (self.I * self.previous_state)))
 self.previous_state = np.array(self.current_state)
 def get_current_position(self, observed_position, observed_velocity, acceleration):
 self._calculate_predicted_state(acceleration)
 self._calculate_predicted_covariance()
 self._calculate_kalman_gain()
 self._adjust_measured_observation(observed_position, observed_velocity)
 self._calculate_current_position()
 self._update_predicted_convariance()
 return self.previous_state[0][0]
if __name__ == '__main__':
 kf = KalmanFilter(1, 1.5, .5, .1, .3, .4, .3, .8)
 for i in range(20):
 print(kf.get_current_position(2.5, .8, .1))
asked Aug 1, 2017 at 16:26
\$\endgroup\$
1

1 Answer 1

2
\$\begingroup\$

let me know if it's the correct implementation of a 2-D kalman filter

It definitely isn't. You're exclusively using the Hadamard product through the * operator when there are several matrix products @ needed. The rest of this review will go over implementation details without worrying that the implementation is incorrect.

Since self.I is an identity matrix, writing I.T is by definition pointless; this is more evidence that the implementation is incorrect.

Remove (object) since you aren't (or, at least, shouldn't be) in Python 2 anymore.

Since this class regularly mutates its members, it can't be e.g. a NamedTuple or frozen @dataclass, but you can still write __slots__ so that no new members are accidentally set, and you can make several of the arrays read-only.

Add PEP484 type hints.

I should be set from an np.eye() call.

You have several instances of parentheses that are redundant in context due to order of operations, such as .5 * (delta_t**2). To de-noise the code I propose removing several of these.

Make obs_Error lowercase.

Writing self.predicted_state = self._calculate_predicted_state(acceleration) almost certainly violates your intent. _calculate_predicted_state already sets self.predicted_state, but then returns a None that you use to overwrite the same variable!

self.measured_state is unused so delete it.

Instead of

 obs_X = np.array([[observed_position], [observed_velocity]])
 self.observed_state = self.I * obs_X + self.obs_Error

write

 obs_X = np.diag((observed_position, observed_velocity))
 self.observed_state = obs_X + self.obs_error

Avoid writing a line continuation \, and instead wrap a multi-line expression in parens.

Instead of

 self.previous_state = np.array(self.current_state)

just call .copy(); or better yet avoid a referential replace by assigning to [:].

Don't double index like self.previous_state[0][0]. Instead write [0,0].

Your constructor failed to set an initial predicted_state, kalman_gain, P, etc. You should do that, among other reasons because best practice is to avoid assigning new members outside of the constructor, and it takes care of all of the allocation in one shot.

import numpy as np
class KalmanFilter:
 __slots__ = (
 'w', 'q', 'A', 'B', 'I', 'P', 'R',
 'obs_error', 'kalman_gain', 'previous_P',
 'observed_state', 'current_state', 'predicted_state', 'previous_state', 'measured_state',
 )
 def __init__(
 self,
 delta_t: float,
 initial_position: float,
 initial_velocity: float,
 acceleration: float,
 calc_error_in_position: float,
 calc_error_in_velocity: float,
 obs_error_in_position: float,
 obs_error_in_velocity: float,
 ) -> None:
 self.obs_error = 0.
 self.w = 0.
 self.q = 0.
 self.I = np.eye(2)
 self.A = np.array((
 (1, delta_t),
 (0, 1),
 ))
 self.B = np.array([(.5*delta_t**2, delta_t)])
 self.current_state, self.observed_state, self.predicted_state, self.kalman_gain, self.P = np.full(
 shape=(5, 2, 2), fill_value=np.nan,
 )
 self.previous_state = np.array((
 [initial_position],
 [initial_velocity],
 ))
 self._calculate_predicted_state(acceleration)
 self.previous_P = np.array((
 (calc_error_in_position**2, calc_error_in_position*calc_error_in_velocity),
 (calc_error_in_position*calc_error_in_velocity, calc_error_in_velocity**2),
 ))
 self.R = np.array((
 (obs_error_in_position**2, obs_error_in_position*obs_error_in_velocity),
 (obs_error_in_position*obs_error_in_velocity, obs_error_in_velocity**2),
 ))
 for array in (self.I, self.A, self.B, self.R):
 array.flags.writeable = False
 def _adjust_measured_observation(self, observed_position: float, observed_velocity: float) -> None:
 obs_X = np.diag((observed_position, observed_velocity))
 self.observed_state[:] = obs_X + self.obs_error
 def _calculate_predicted_state(self, acceleration: float) -> None:
 self.predicted_state[:] = (
 self.A*self.previous_state + self.B*acceleration + self.w
 )
 def _calculate_predicted_covariance(self) -> None:
 self.P[:] = self.A*self.previous_P*self.A.T + self.q
 def _update_predicted_convariance(self) -> None:
 self.previous_P[:] = (self.I - self.kalman_gain*self.I)*self.P
 def _calculate_kalman_gain(self) -> None:
 self.kalman_gain[:] = self.P*self.I.T / (
 self.I*self.P*self.I.T + self.R
 )
 def _calculate_current_position(self) -> None:
 self.current_state[:] = self.previous_state + self.kalman_gain*(
 self.observed_state - self.I*self.previous_state
 )
 # BUG: previous_state starts as (2, 1), but this sets it to (2, 2). That's definitely wrong.
 # self.previous_state[:] = self.current_state
 self.previous_state = self.current_state
 def get_current_position(
 self, observed_position: float, observed_velocity: float, acceleration: float,
 ) -> float:
 self._calculate_predicted_state(acceleration)
 self._calculate_predicted_covariance()
 self._calculate_kalman_gain()
 self._adjust_measured_observation(observed_position, observed_velocity)
 self._calculate_current_position()
 self._update_predicted_convariance()
 return self.previous_state[0,0]
def test() -> None:
 kf = KalmanFilter(
 delta_t=1, initial_position=1.5, initial_velocity=.5, acceleration=.1,
 calc_error_in_position=.3, calc_error_in_velocity=.4,
 obs_error_in_position=.3, obs_error_in_velocity=.8,
 )
 positions = [
 kf.get_current_position(observed_position=2.5, observed_velocity=.8, acceleration=.1)
 for _ in range(20)
 ]
 expected = (
 2.0 ,
 2.1666666666666666,
 2.25 ,
 2.3 ,
 2.3333333333333333,
 2.3571428571428568,
 2.375 ,
 2.3888888888888889,
 2.4 ,
 2.4090909090909091,
 2.4166666666666667,
 2.423076923076923 ,
 2.4285714285714286,
 2.4333333333333333,
 2.4375 ,
 2.4411764705882355,
 2.4444444444444444,
 2.447368421052632 ,
 2.45 ,
 2.4523809523809526,
 )
 assert np.allclose(positions, expected, atol=0, rtol=1e-15)
if __name__ == '__main__':
 test()
answered Dec 23, 2024 at 16:36
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.