I'm working on an A* search algorithm implemention and this is what I came up with for the open list:
from heapq import heappush, heappop
class OpenList(set):
'''
This uses a heap for fast retrieval of the node with the lowest path score.
It also uses a set for efficiently checking if a node is contained.
'''
REMOVED = object()
def __init__(self, *args, **kwargs):
set.__init__(self, *args, **kwargs)
self._heap = []
def add(self, node):
set.add(self, node)
heappush(self._heap, (node.get_path_score(), node))
def remove(self, node):
set.remove(self, node)
i = self._heap.index((node.get_path_score(), node))
self._heap[i] = self.REMOVED
def pop_node_with_lowest_path_score(self):
''' remove and return the node with the lowest path score '''
item = heappop(self._heap)
while item is self.REMOVED:
item = heappop(self._heap)
set.remove(self, item[1])
return item[1]
Should I keep the path_score in the heap, when I mark the item as removed as in the following code:
self._heap[i] = (node.get_path_score(), self.REMOVED)
What do you think about it? What can be made more efficient, simple or cleaner?
1 Answer 1
1. Bug
You don't properly synchronize the set and the heap in all cases. For example, when taking a union:
>>> a = OpenList() >>> a |= set([1,2,3]) >>> a OpenList([1, 2, 3]) >>> a._heap []
If you insist on your class being a subclass of
set
, then you probably ought to intercept every method that modifies the set (and there are quite a few:__isub__
,__ior__
,__ixor__
,__iand__
,discard
,difference_update
,intersection_update
,symmetric_difference_update
,update
, andclear
, not to mentioncopy
) and ensure that the heap is modified in a corresponding fashion.But this would be overkill for what ought to be a fairly simple class. What I would do instead would be to inherit from
object
instead ofset
, and implement the set as an instance member_set
, in the same way that you have_heap
as an instance member. With this approach, you can ensure that operations on_set
always have corresponding operations on_heap
so that they cannot get out of sync.
2. Other issues
Your class is not as general as it could be. In particular, it can only be used to store objects which have a
get_path_score
method. Why not allow the user to pass in akey
function that gets the score for an object? (As insorted
andheapq.nlargest
and similar functions.)You call the superclass method directly via
set
rather than via thesuper()
function. This means that it's not possible for someone to create a new class that subclasses both your class and some other classC
. (Because the method resolution order will go directly from your class toset
instead of via the other classC
.)In the
remove
method, you replace the removed item with a placeholder. This takes time O(n) because you have to find the item in the heap. But removing the item from the heap and re-heapifying would also take O(n), and it would be simpler (because then there would be no need to handle the placeholder item in thepop
method). So it's not clear to me that your approach is worth it.You should think about what you are trying to achieve: if you really want to have fast removal of items, then you need to be cleverer about how you find the removed item in the heap; but if you're happy for removal to be O(n), then you can simplify the code by just deleting the item from the heap and re-heapifying the remainder.
3. Naming and documentation
The data structure used by the A* search algorithm is usually known as a priority queue: that is, a collection of items each of which is associated with a priority (or score), and which can be retrieved in order of their priority. So I think you could choose a better name than
OpenList
for your class.Your docstring for the class is written with the wrong audience in mind. It contains implementation details ("uses a heap ... also uses a set") but this is not helpful for users of the class, who want to know what it is for and how to use it.
Your class is a good candidate for a doctest or two.
The method name
pop_node_with_lowest_path_score
is absurd. Why not justpop
?
4. Rewrite
Here's how I'd rewrite the class to fix all the issues discussed above, assuming that you really do want fast removal of items. I'll let you reverse-engineer it to figure out how it works!
from heapq import heappush, heappop
class PriorityQueueElement(object):
"""
A proxy for an element in a priority queue that remembers (and
compares according to) its score.
"""
def __init__(self, elem, score):
self._elem = elem
self._score = score
self._removed = False
def __lt__(self, other):
return self._score < other._score
class PriorityQueue(object):
"""
A priority queue with O(log n) addition, O(1) membership test and
amortized O(log n) removal.
The `key` argument to the constructor specifies a function that
returns the score for an element in the priority queue. (If not
supplied, an element is its own score).
The `add` and `remove` methods add and remove elements from the
queue, and the `pop` method removes and returns the element with
the lowest score.
>>> q = PriorityQueue([3, 1, 4])
>>> q.pop()
1
>>> q.add(2); q.pop()
2
>>> q.remove(3); q.pop()
4
>>> list(q)
[]
>>> q = PriorityQueue('vext cwm fjord'.split(), key = lambda s:len(s))
>>> q.pop()
'cwm'
"""
def __init__(self, *args, **kwargs):
self._key = kwargs.pop('key', lambda x:x)
self._heap = []
self._dict = {}
if args:
for elem in args[0]:
self.add(elem)
def __contains__(self, elem):
return elem in self._dict
def __iter__(self):
return iter(self._dict)
def add(self, elem):
"""
Add an element to a priority queue.
"""
e = PriorityQueueElement(elem, self._key(elem))
self._dict[elem] = e
heappush(self._heap, e)
def remove(self, elem):
"""
Remove an element from a priority queue.
If the element is not a member, raise KeyError.
"""
e = self._dict.pop(elem)
e._removed = True
def pop(self):
"""
Remove and return the element with the smallest score from a
priority queue.
"""
while True:
e = heappop(self._heap)
if not e._removed:
del self._dict[e._elem]
return e._elem