2
\$\begingroup\$

The integration field needs to get recalculated every time the starting or goal coordinates change. I wanted to use the flow field algorithm for a problem that includes bigger mazes with changing starting or ending points every 0.5 seconds. Right now the algorithm is far too slow to be useful because the integration field needs to get recalculated every time the starting or ending point changes.

Has someone an idea how to reduce the running time of the method create_integration_field in class FlowField? I'm glad about every hint! (If you are good with numpy, I'd appreciate a possible solution using it, too.) Thank you in advance:

import numpy as np
import matplotlib.pyplot as plt
show_animation = True
def draw_horizontal_line(start_x, start_y, length, o_x, o_y, o_dict, path):
 for i in range(start_x, start_x + length):
 for j in range(start_y, start_y + 2):
 o_x.append(i)
 o_y.append(j)
 o_dict[(i, j)] = path
def draw_vertical_line(start_x, start_y, length, o_x, o_y, o_dict, path):
 for i in range(start_x, start_x + 2):
 for j in range(start_y, start_y + length):
 o_x.append(i)
 o_y.append(j)
 o_dict[(i, j)] = path
class FlowField:
 def __init__(self, obs_grid, goal_x, goal_y, start_x, start_y,
 limit_x, limit_y):
 self.start_pt = [start_x, start_y]
 self.goal_pt = [goal_x, goal_y]
 self.obs_grid = obs_grid
 self.limit_x, self.limit_y = limit_x, limit_y
 self.cost_field = {}
 self.integration_field = {}
 self.vector_field = {}
 def find_path(self):
 self.create_cost_field()
 self.create_integration_field()
 self.assign_vectors()
 self.follow_vectors()
 def create_cost_field(self):
 """Assign cost to each grid which defines the energy
 it would take to get there."""
 for i in range(self.limit_x):
 for j in range(self.limit_y):
 if self.obs_grid[(i, j)] == 'free':
 self.cost_field[(i, j)] = 1
 elif self.obs_grid[(i, j)] == 'medium':
 self.cost_field[(i, j)] = 7
 elif self.obs_grid[(i, j)] == 'hard':
 self.cost_field[(i, j)] = 20
 elif self.obs_grid[(i, j)] == 'obs':
 continue
 if [i, j] == self.goal_pt:
 self.cost_field[(i, j)] = 0
 def create_integration_field(self):
 """Start from the goal node and calculate the value
 of the integration field at each node. Start by
 assigning a value of infinity to every node except
 the goal node which is assigned a value of 0. Put the
 goal node in the open list and then get its neighbors
 (must not be obstacles). For each neighbor, the new
 cost is equal to the cost of the current node in the
 integration field (in the beginning, this will simply
 be the goal node) + the cost of the neighbor in the
 cost field + the extra cost (optional). The new cost
 is only assigned if it is less than the previously
 assigned cost of the node in the integration field and,
 when that happens, the neighbor is put on the open list.
 This process continues until the open list is empty."""
 for i in range(self.limit_x):
 for j in range(self.limit_y):
 if self.obs_grid[(i, j)] == 'obs':
 continue
 self.integration_field[(i, j)] = np.inf
 if [i, j] == self.goal_pt:
 self.integration_field[(i, j)] = 0
 open_list = [(self.goal_pt, 0)]
 while open_list:
 curr_pos, curr_cost = open_list[0]
 curr_x, curr_y = curr_pos
 for i in range(-1, 2):
 for j in range(-1, 2):
 x, y = curr_x + i, curr_y + j
 if self.obs_grid[(x, y)] == 'obs':
 continue
 if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
 e_cost = 10
 else:
 e_cost = 14
 neighbor_energy = self.cost_field[(x, y)]
 neighbor_old_cost = self.integration_field[(x, y)]
 neighbor_new_cost = curr_cost + neighbor_energy + e_cost
 if neighbor_new_cost < neighbor_old_cost:
 self.integration_field[(x, y)] = neighbor_new_cost
 open_list.append(([x, y], neighbor_new_cost))
 del open_list[0]
 def assign_vectors(self):
 """For each node, assign a vector from itself to the node with
 the lowest cost in the integration field. An agent will simply
 follow this vector field to the goal"""
 for i in range(self.limit_x):
 for j in range(self.limit_y):
 if self.obs_grid[(i, j)] == 'obs':
 continue
 if [i, j] == self.goal_pt:
 self.vector_field[(i, j)] = (None, None)
 continue
 offset_list = [(i + a, j + b)
 for a in range(-1, 2)
 for b in range(-1, 2)]
 neighbor_list = [{'loc': pt,
 'cost': self.integration_field[pt]}
 for pt in offset_list
 if self.obs_grid[pt] != 'obs']
 neighbor_list = sorted(neighbor_list, key=lambda x: x['cost'])
 best_neighbor = neighbor_list[0]['loc']
 self.vector_field[(i, j)] = best_neighbor
 
 def follow_vectors(self):
 curr_x, curr_y = self.start_pt
 while curr_x is not None and curr_y is not None:
 curr_x, curr_y = self.vector_field[(curr_x, curr_y)]
 if curr_x is None or curr_y is None:
 break
 if show_animation:
 plt.plot(curr_x, curr_y, "b*")
 plt.pause(0.001)
 if show_animation:
 plt.show()
def main():
 # set obstacle positions
 obs_dict = {}
 for i in range(51):
 for j in range(51):
 obs_dict[(i, j)] = 'free'
 o_x, o_y, m_x, m_y, h_x, h_y = [], [], [], [], [], []
 s_x = 5.0
 s_y = 5.0
 g_x = 35.0
 g_y = 45.0
 # draw outer border of maze
 draw_vertical_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
 draw_vertical_line(48, 0, 50, o_x, o_y, obs_dict, 'obs')
 draw_horizontal_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
 draw_horizontal_line(0, 48, 50, o_x, o_y, obs_dict, 'obs')
 # draw inner walls
 all_x = [10, 10, 10, 15, 20, 20, 30, 30, 35, 30, 40, 45]
 all_y = [10, 30, 45, 20, 5, 40, 10, 40, 5, 40, 10, 25]
 all_len = [10, 10, 5, 10, 10, 5, 20, 10, 25, 10, 35, 15]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_vertical_line(x, y, l, o_x, o_y, obs_dict, 'obs')
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [35, 40, 15, 10, 45, 20, 10, 15, 25, 45, 10, 30, 10, 40]
 all_y = [5, 10, 15, 20, 20, 25, 30, 35, 35, 35, 40, 40, 45, 45]
 all_len = [10, 5, 10, 10, 5, 5, 10, 5, 10, 5, 10, 5, 5, 5]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_horizontal_line(x, y, l, o_x, o_y, obs_dict, 'obs')
 # Some points are assigned a slightly higher energy value in the cost
 # field. For example, if an agent wishes to go to a point, it might
 # encounter different kind of terrain like grass and dirt. Grass is
 # assigned medium difficulty of passage (color coded as green on the
 # map here). Dirt is assigned hard difficulty of passage (color coded
 # as brown here). Hence, this algorithm will take into account how
 # difficult it is to go through certain areas of a map when deciding
 # the shortest path.
 # draw paths that have medium difficulty (in terms of going through them)
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [10, 45]
 all_y = [22, 20]
 all_len = [8, 5]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_vertical_line(x, y, l, m_x, m_y, obs_dict, 'medium')
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [20, 30, 42] + [47] * 5
 all_y = [35, 30, 38] + [37 + i for i in range(2)]
 all_len = [5, 7, 3] + [1] * 3
 for x, y, l in zip(all_x, all_y, all_len):
 draw_horizontal_line(x, y, l, m_x, m_y, obs_dict, 'medium')
 # draw paths that have hard difficulty (in terms of going through them)
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [15, 20, 35]
 all_y = [45, 20, 35]
 all_len = [3, 5, 7]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_vertical_line(x, y, l, h_x, h_y, obs_dict, 'hard')
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [30] + [47] * 5
 all_y = [10] + [37 + i for i in range(2)]
 all_len = [5] + [1] * 3
 for x, y, l in zip(all_x, all_y, all_len):
 draw_horizontal_line(x, y, l, h_x, h_y, obs_dict, 'hard')
 if show_animation:
 plt.plot(o_x, o_y, "sr")
 plt.plot(m_x, m_y, "sg")
 plt.plot(h_x, h_y, "sy")
 plt.plot(s_x, s_y, "og")
 plt.plot(g_x, g_y, "o")
 plt.grid(True)
 flow_obj = FlowField(obs_dict, g_x, g_y, s_x, s_y, 50, 50)
 flow_obj.find_path()
if __name__ == '__main__':
 main()
asked Mar 30, 2021 at 14:20
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

The code, at least in terms of style, is already pretty decent. Consider:

  • Add PEP484 type hints

  • Your draw_*_line methods have a confusing name - they don't actually draw anything; they're data construction/setup routines. Also, rather than mutating o_x, o_y and o_dict, entirely eliminating side-effects from this function is easy - just return two lists and a dict, and at the calling side call two extends and an update, respectively.

  • start_pt and goal_pt should use tuples, not lists

  • For this code:

     for i in range(self.limit_x):
     for j in range(self.limit_y):
    

Since it's repeated several times, consider factoring it out into a generator function that yields coordinate tuples.

  • Consider replacing your 'obs', etc. strings with Enum values
  • Use a set instead of a list for membership checks in if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
  • For your neighbor_list, rather than a weakly-typed dictionary, make a class - a @dataclass would suit

An implementation that does some of the above:

from typing import Tuple, Dict, List
import numpy as np
import matplotlib.pyplot as plt
show_animation = True
Coord = Tuple[int, int]
def draw_horizontal_line(
 start_x: int, start_y: int, length: int,
 o_x: List[int], o_y: List[int],
 o_dict: Dict[Coord, str],
 path: str,
) -> None:
 for i in range(start_x, start_x + length):
 for j in range(start_y, start_y + 2):
 o_x.append(i)
 o_y.append(j)
 o_dict[(i, j)] = path
def draw_vertical_line(
 start_x: int, start_y: int, length: int,
 o_x: List[int], o_y: List[int],
 o_dict: Dict[Coord, str],
 path: str,
) -> None:
 for i in range(start_x, start_x + 2):
 for j in range(start_y, start_y + length):
 o_x.append(i)
 o_y.append(j)
 o_dict[(i, j)] = path
class FlowField:
 def __init__(
 self,
 obs_grid: Dict[Coord, str],
 goal_x: float, goal_y: float,
 start_x: float, start_y: float,
 limit_x: int, limit_y: int,
 ):
 self.start_pt = (start_x, start_y)
 self.goal_pt = (goal_x, goal_y)
 self.obs_grid = obs_grid
 self.limit_x, self.limit_y = limit_x, limit_y
 self.cost_field: Dict[Coord, int] = {}
 self.integration_field: Dict[Coord, int] = {}
 self.vector_field: Dict[Coord, Coord] = {}
 def find_path(self) -> None:
 self.create_cost_field()
 self.create_integration_field()
 self.assign_vectors()
 self.follow_vectors()
 def create_cost_field(self) -> None:
 """Assign cost to each grid which defines the energy
 it would take to get there."""
 for i in range(self.limit_x):
 for j in range(self.limit_y):
 coord = (i, j)
 if self.obs_grid[coord] == 'free':
 self.cost_field[coord] = 1
 elif self.obs_grid[coord] == 'medium':
 self.cost_field[coord] = 7
 elif self.obs_grid[coord] == 'hard':
 self.cost_field[coord] = 20
 elif self.obs_grid[coord] == 'obs':
 continue
 if coord == self.goal_pt:
 self.cost_field[coord] = 0
 def create_integration_field(self) -> None:
 """Start from the goal node and calculate the value
 of the integration field at each node. Start by
 assigning a value of infinity to every node except
 the goal node which is assigned a value of 0. Put the
 goal node in the open list and then get its neighbors
 (must not be obstacles). For each neighbor, the new
 cost is equal to the cost of the current node in the
 integration field (in the beginning, this will simply
 be the goal node) + the cost of the neighbor in the
 cost field + the extra cost (optional). The new cost
 is only assigned if it is less than the previously
 assigned cost of the node in the integration field and,
 when that happens, the neighbor is put on the open list.
 This process continues until the open list is empty."""
 for i in range(self.limit_x):
 for j in range(self.limit_y):
 if self.obs_grid[(i, j)] == 'obs':
 continue
 self.integration_field[(i, j)] = np.inf
 if (i, j) == self.goal_pt:
 self.integration_field[(i, j)] = 0
 open_list = [(self.goal_pt, 0)]
 while open_list:
 curr_pos, curr_cost = open_list[0]
 curr_x, curr_y = curr_pos
 for i in range(-1, 2):
 for j in range(-1, 2):
 x, y = curr_x + i, curr_y + j
 if self.obs_grid[(x, y)] == 'obs':
 continue
 if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
 e_cost = 10
 else:
 e_cost = 14
 neighbor_energy = self.cost_field[(x, y)]
 neighbor_old_cost = self.integration_field[(x, y)]
 neighbor_new_cost = curr_cost + neighbor_energy + e_cost
 if neighbor_new_cost < neighbor_old_cost:
 self.integration_field[(x, y)] = neighbor_new_cost
 open_list.append(([x, y], neighbor_new_cost))
 del open_list[0]
 def assign_vectors(self) -> None:
 """For each node, assign a vector from itself to the node with
 the lowest cost in the integration field. An agent will simply
 follow this vector field to the goal"""
 for i in range(self.limit_x):
 for j in range(self.limit_y):
 if self.obs_grid[(i, j)] == 'obs':
 continue
 if (i, j) == self.goal_pt:
 self.vector_field[(i, j)] = (None, None)
 continue
 offset_list = [(i + a, j + b)
 for a in range(-1, 2)
 for b in range(-1, 2)]
 neighbor_list = [{'loc': pt,
 'cost': self.integration_field[pt]}
 for pt in offset_list
 if self.obs_grid[pt] != 'obs']
 neighbor_list = sorted(neighbor_list, key=lambda x: x['cost'])
 best_neighbor = neighbor_list[0]['loc']
 self.vector_field[(i, j)] = best_neighbor
 def follow_vectors(self) -> None:
 curr_x, curr_y = self.start_pt
 while curr_x is not None and curr_y is not None:
 curr_x, curr_y = self.vector_field[(curr_x, curr_y)]
 if curr_x is None or curr_y is None:
 break
 if show_animation:
 plt.plot(curr_x, curr_y, "b*")
 plt.pause(0.001)
 if show_animation:
 plt.show()
def main():
 # set obstacle positions
 obs_dict = {}
 for i in range(51):
 for j in range(51):
 obs_dict[(i, j)] = 'free'
 o_x, o_y, m_x, m_y, h_x, h_y = [], [], [], [], [], []
 s_x = 5.0
 s_y = 5.0
 g_x = 35.0
 g_y = 45.0
 # draw outer border of maze
 draw_vertical_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
 draw_vertical_line(48, 0, 50, o_x, o_y, obs_dict, 'obs')
 draw_horizontal_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
 draw_horizontal_line(0, 48, 50, o_x, o_y, obs_dict, 'obs')
 # draw inner walls
 all_x = [10, 10, 10, 15, 20, 20, 30, 30, 35, 30, 40, 45]
 all_y = [10, 30, 45, 20, 5, 40, 10, 40, 5, 40, 10, 25]
 all_len = [10, 10, 5, 10, 10, 5, 20, 10, 25, 10, 35, 15]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_vertical_line(x, y, l, o_x, o_y, obs_dict, 'obs')
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [35, 40, 15, 10, 45, 20, 10, 15, 25, 45, 10, 30, 10, 40]
 all_y = [5, 10, 15, 20, 20, 25, 30, 35, 35, 35, 40, 40, 45, 45]
 all_len = [10, 5, 10, 10, 5, 5, 10, 5, 10, 5, 10, 5, 5, 5]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_horizontal_line(x, y, l, o_x, o_y, obs_dict, 'obs')
 # Some points are assigned a slightly higher energy value in the cost
 # field. For example, if an agent wishes to go to a point, it might
 # encounter different kind of terrain like grass and dirt. Grass is
 # assigned medium difficulty of passage (color coded as green on the
 # map here). Dirt is assigned hard difficulty of passage (color coded
 # as brown here). Hence, this algorithm will take into account how
 # difficult it is to go through certain areas of a map when deciding
 # the shortest path.
 # draw paths that have medium difficulty (in terms of going through them)
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [10, 45]
 all_y = [22, 20]
 all_len = [8, 5]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_vertical_line(x, y, l, m_x, m_y, obs_dict, 'medium')
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [20, 30, 42] + [47] * 5
 all_y = [35, 30, 38] + [37 + i for i in range(2)]
 all_len = [5, 7, 3] + [1] * 3
 for x, y, l in zip(all_x, all_y, all_len):
 draw_horizontal_line(x, y, l, m_x, m_y, obs_dict, 'medium')
 # draw paths that have hard difficulty (in terms of going through them)
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [15, 20, 35]
 all_y = [45, 20, 35]
 all_len = [3, 5, 7]
 for x, y, l in zip(all_x, all_y, all_len):
 draw_vertical_line(x, y, l, h_x, h_y, obs_dict, 'hard')
 all_x[:], all_y[:], all_len[:] = [], [], []
 all_x = [30] + [47] * 5
 all_y = [10] + [37 + i for i in range(2)]
 all_len = [5] + [1] * 3
 for x, y, l in zip(all_x, all_y, all_len):
 draw_horizontal_line(x, y, l, h_x, h_y, obs_dict, 'hard')
 if show_animation:
 plt.plot(o_x, o_y, "sr")
 plt.plot(m_x, m_y, "sg")
 plt.plot(h_x, h_y, "sy")
 plt.plot(s_x, s_y, "og")
 plt.plot(g_x, g_y, "o")
 plt.grid(True)
 flow_obj = FlowField(obs_dict, g_x, g_y, s_x, s_y, 50, 50)
 flow_obj.find_path()
if __name__ == '__main__':
 main()
answered Mar 30, 2021 at 16:45
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.