I wrote code which implements the A* algorithm in Python. A* is a shortest path algorithm which resembles Dijkstra's Algorithm, but includes an estimator to guide the search. The pseudocode of both algorithms can be found on the wikipedia pages.
I know that the networkx python package includes an A*, but only for a completely defined graph. For my purpose i needed dynamic neighbor generation. This meant that in the beginning only the start node is defined, and each time a node is inspected the missing neighbors are created and added.
I wrote two basic classes which can be extended to fit the specific problems. They do not work by themselves since some methods are problem-specific. The class Astar represents the solver algorithm and network and the class Node represents one Node in this network
Astar:
class AStar:
"""
Computes shortest path based on A-star algorithm
"""
def __init__(self):
self.closedNodes = {}
self.openNodes = {}
self.estimatedLengths = {} #Length of path to node + estimated path left
self.nodesExplored = 0
self.path = []
def run(self):
"""
Runs the solver and sets the solution to self.path
Returns:
bool If a shortest path has been found
"""
foundSolution = False
while len(self.openNodes) > 0 and not foundSolution:
self.nodesExplored += 1
#Select node
activeNodeKey = min(self.estimatedLengths,key=self.estimatedLengths.get)
activeNode = self.openNodes[activeNodeKey]
if(self.isFinished(activeNode)):
foundSolution = True
#close node
self.closedNodes[activeNodeKey] = activeNode
del self.openNodes[activeNodeKey]
del self.estimatedLengths[activeNodeKey]
for key,node in activeNode.getNeighbors().items():
#Skip closed nodes
if key in self.closedNodes:
continue
#Add node if it is new
if key not in self.openNodes:
self.openNodes[key] = node
#update node
length = activeNode.length+node.getDistance(activeNode.key)
if(length < node.length):
#Best path so far to this node
node.cameFrom = activeNode
node.length = length
self.estimatedLengths[key] = length+node.estimate()
if(foundSolution):
self.reconstructSolution(activeNode)
return True
else:
return False
def addStartNode(self,node,length = 0):
"""
Adds start node and sets length unless length is set to None
Args:
node Node
length Starting length for this node
"""
self.openNodes[node.key] = node
if length is not None:
node.length = 0
self.estimatedLengths[node.key] = 0
def reconstructSolution(self,node):
"""
Gets solution from a node. Sets to self.path
Args:
node Node
"""
nodes = []
length = 0
while node.cameFrom is not None:
nodes.append(node)
length += node.getDistance(node.cameFrom.key)
node = node.cameFrom
nodes.append(node)
self.path = reversed(nodes)
self.length = length
def isFinished(self,activeNode):
"""
Expandable function.
Checks if node is a finished node
Args:
activeNode: node
"""
return False
Node:
class Node:
"""
Represents a node in the Astar solver
"""
def __init__(self,astar):
self.distances = {} #distances to neighbors
self.length = float('inf') #Length from start to this node
self.astar = astar #Link to Astar
self.setKey()
self.cameFrom = None #Previous node in path
def getNeighbors(self):
"""
Gets neighbors of self. Creates if necessary
Return:
dictionary keys : nodes
"""
neighbors = self.createNeighbors()
nodes = {}
#Use created node or node from network
for key,node in neighbors.items():
if node.key in self.astar.openNodes:
nodes[node.key] = self.astar.openNodes[node.key]
elif node.key in self.astar.closedNodes:
nodes[node.key] = self.astar.closedNodes[node.key]
else:
nodes[node.key] = node
return nodes
def setKey(self):
"""
Expandable function
Generates Key and sets it to self.key. Key has to be unique and represents the node settings
"""
pass
def getDistance(self,nodeKey):
"""
Gets distance to a neighbor
"""
if nodeKey in self.distances:
return self.distances[nodeKey]
else:
return float('inf')
def estimate(self):
"""
Gets estimated distance left to final state.
When 0, the algorithm turns into Dijkstra
Returns float
"""
return 0
To use this class, the problem-specific classes need to be extended. Here is an example of a one dimensional algorithm. Each node has a value (x) and neighbors x+1 and x-1. The path found is then from start to a certain value. Although this is definitely not an interesting path, it shows how to use the main classes: Astar_1d:
class AStar_1d(AStar):
"""
Performs to find shortest path in 1d space to target
"""
def __init__(self,target):
"""
Args:
target int Value of final node
"""
self.target = target
super(AStar_1d,self).__init__()
def isFinished(self,node):
if(node.x == self.target):
return True
else:
return False
Node_1d:
class Node_1d(Node):
"""
Node for Astar_1d
"""
def __init__(self,astar,x):
"""
Args:
astar Astar
x int position of this node
"""
self.x = x
super(Node_1d,self).__init__(astar)
def createNeighbors(self):
nodes = {}
node_left = Node_1d(self.astar,self.x-1)
node_left.distances[self.key] = 1
nodes[node_left.key] = node_left
node_right = Node_1d(self.astar,self.x+1)
node_right.distances[self.key] = 1
nodes[node_right.key] = node_right
return nodes
def setKey(self):
self.key = self.x
This is then how to run the code:
optim = AStar_1d(5) #Creates Astar with goal of 5
startNode = Node_1d(optim,3) #Create starting node with value of 3
optim.addStartNode(startNode) #Adds start node to solver
foundPath = optim.run() #Run solver
if(foundPath):
print('Solution found. Nodes explored: %s ' % optim.nodesExplored)
print('Path length: %s' % optim.length)
for node in optim.path:
print(node.x)
Although i appreciate all comments, i am especially looking for comments in regards of speed of this code. I also hope that i didn't do this work for nothing, but if someone knows an implementation of the same algorithm already in python i would be interested to hear that.
I am also happy to answer any questions about the code or the algorithm in general.
Thanks in advance!
1 Answer 1
PEP8 asks for underscores, e.g. closed_nodes
, but at least you're consistent, I'll give you that. Run flake8
if you wish.
Overall, your use of length as a dictionary key ensures there will be no ties for best path length. I wouldn't mind seeing a comment that touches on the topic of tie breaking, even if it just explicitly says it happens through arbitrary choice. In some symmetric problem domains ties are an interesting concern. Quoting from https://en.wikipedia.org/wiki/A%2A_search_algorithm#Implementation_details
...the way the priority queue handles ties can have a significant effect on performance in some situations. If ties are broken so the queue behaves in a LIFO manner, A* will behave like depth-first search among equal cost paths (avoiding exploring more than one equally optimal solution).
This looks expensive: activeNodeKey = min(...)
. Consider using a priority queue or import heapq.
When you iterate for key,node
, you might choose a more descriptive identifier, such as for key, nbr
to indicate you are considering each neighbor node.
In addStartNode
, this seems weird:
if length is not None:
node.length = 0
I expected to see length
on the RHS rather than zero. It defaults in the method signature. Maybe the API you were looking for was a boolean parameter that requests the length be zeroed.
The docstring for isFinished
is not very helpful. It says "Expandable function.", and then the rest is obvious. You might offer guidance about how to override it, or at least point to AStar_1d
for an example.
In Node your reference to, for example, self.astar.openNodes[node.key]
feels clunky. Maybe you could have passed in the open/closed maps? Creating a result dict is perfectly nice, and clear, but you might also consider making getNeighbors a generator that yield
s such values.
The docstring for setKey()
is helpful. The pass
is clearly incorrect, in that it doesn't conform to the contract spelled out by the docstring. You might want to unconditionally raise an exception, to ensure the method will be overridden.
You specifically asked about the speed of the code. Other than the linear min()
scan and the needless creation of temporary dict objects, I didn't notice any trouble areas leaping off the page. Looks good.
-
\$\begingroup\$ Wow. Thanks a lot for this comment. I just have one question: "In Node your reference to, for example, self.astar.openNodes[node.key] feels clunky. Maybe you could have passed in the open/closed maps? " What do you exactly mean with this? For the rest, great comments and thank you for your time, i will implement them \$\endgroup\$user3053216– user30532162017年08月30日 10:02:39 +00:00Commented Aug 30, 2017 at 10:02
-
\$\begingroup\$ Another question about heapq. Estimated lengths will be overwritten quite often, and if i understand heaps correctly it means that the old values will stay in there and i have to keep a separate list of the removed nodes. This seems as a lot of extra data, so i am not sure if using heapq is the best choice. What do you think about this? \$\endgroup\$user3053216– user30532162017年08月30日 10:42:10 +00:00Commented Aug 30, 2017 at 10:42
-
1\$\begingroup\$ I was suggesting that
astar
was maybe a bit too broad, that if it was to support access to 2 maps then makingopenNodes
andclosedNodes
parameters may be sensible. It's style thing, didn't quite feel right. \$\endgroup\$J_H– J_H2017年08月30日 16:25:30 +00:00Commented Aug 30, 2017 at 16:25 -
1\$\begingroup\$ My heapq remark was motivated by the min() linear scan. What we're examining here is "what kind of operations must we support? how often do they happen? what are their complexities?" So I was trying to replace linear O(n) operation on a simple data structure with constant O(1) operation on a fancier data structure (or O(log n) when we heapify). I recommend you write down answers to those three questions to help with making the tradeoffs. As far as cost of deletion goes, remember that you can overwrite with an invalid or tombstone value, defering the cleanup work for a while. \$\endgroup\$J_H– J_H2017年08月30日 16:30:19 +00:00Commented Aug 30, 2017 at 16:30