[Python-checkins] python/nondist/sandbox/collections pairing_heap.py, NONE, 1.1 pairing_heap_test.py, NONE, 1.1

rhettinger at users.sourceforge.net rhettinger at users.sourceforge.net
Fri Apr 2 02:46:05 EST 2004


Update of /cvsroot/python/python/nondist/sandbox/collections
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21444
Added Files:
	pairing_heap.py pairing_heap_test.py 
Log Message:
Add Dan Stutzbach's first pass at paired heaps
(after modifying his test suite to use unittests).
The pure python version is being used to work out API issues, develop
thorough tests, and to tease out implementation issues.
Ultimately, this will be added as collections.heap
--- NEW FILE: pairing_heap.py ---
#!/usr/bin/python
"""
Note: this implementation is not currently thread-safe.
The code in this file implements a pairing heap. By default it
operates as a min-heap, but it can also work as a max-heap via a flag
passed to the heap's __init__ routing.
The important papers on pairing heaps are these:
[1] Michael L. Fredman, Robert Sedjewick, Daniel D. Sleator, and
Robert E. Tarjan. The Pairing Heap: A New Form of Self-Adjusting
Heap. Algorithmica 1:111-129, 1986.
[2] John T. Stasko and Jeffrey Scott Vitter. Pairing heaps:
experiments and analysis. Communications of the ACM, Volume 30, Issue
3, Pages 234-249, 1987.
[3] Michael L. Fredman. On the efficiency of pairing heaps and
related data structures. Journal of the ACM, Volume 46, Issue 4,
Pages 473-501, 1999.
There are actually several varieties of pairing heaps. Five are
described in [1], and two more are described in [2]. This
implementation is the basic pairing heaps described in [1], also known
as the twopass variant. There isn't much published evidence that any
of the variants are significantly better than the others.
The twopass pairing heap works as follows.
A pairing heap is a tree that maintains the heap property: each node
has a lower value than all its children. The pairing heap tree need
not be binary nor balanced. The good amortized efficiency of the
pairing heap results from clever algorithms for .adjust_key() and
.delete().
The basic operation in the pairing heap is the comparison-link. Two
nodes are compared, and the largest is linked as a child of the
smaller. To represent the tree, each node maintains a pointer to the
leftmost child and to its right sibling. Each node also maintains a
pointer to its parent. When a comparison-link is performed, the new
child becomes the leftmost child.
[1] suggests a memory optimization where only the right-most sibling
stores a pointer to the parent, and each node maintains a single bit
indicating if it is that rightmost sibling. This theoretically saves
the size of a pointer in each node, but only if we can store a bit for
free. However, storing a bit takes just as much space as storing a
pointer, due to padding. So, we bite the bullet and store the
pointer. Also, [3] shows that the amortized cost of .adjust_key()
is at least Omega(log log n) if the parent pointer is *not* stored.
The amortized cost may be better using the parent pointer.
For .adjust_key(), the topmost element is removed and set aside as
the return value. A left-to-right pass is made over its children,
comparison-linking them pairwise. For example, if there are 6
children labeled A, B, C, D, E, and F, A and B will be
comparison-linked, as will C and D, and E and F. Next, the rightmost
node is repeatedly comparison-linked with its sibling until there is
only one node left. This node is the new root.
For .delete(), first the node is cut from its parent. This results in
a tree rooted at the node being deleted. The .adjust_key()
procedure is invoked on the node, then the resulting tree is
comparison-linked with the original tree.
See [1] for more details. It also includes figures which are helpful
for understanding.
"""
class WrongHeap (Exception):
 """The user attempted to .delete() or .adjust() from the wrong heap.
 This can also occur if the user attempts to delete or adjust a
 node that has already been removed from the heap.
 """
 def __str__(self):
 return 'WrongHeap'
class Underflow (Exception):
 "The user attempted to .peek() or extract() from an empty heap."
 def __str__(self):
 return 'Underflow'
class IncompatibleHeaps (Exception):
 "The user attempted to meld heaps that have different compare operations."
 def __str__(self):
 return 'IncompatibleHeaps'
class InternalError (Exception):
 "This should never get raised in practice."
 def __str__(self):
 return 'InternalError'
class WrongAdjustKeyDirection (Exception):
 "The user attempted to adjust a key in the wrong direction."
 def __str__(self):
 return 'WrongAdjustKeyDirection'
class heap_node:
 """This class is used internally to implement the heap.
 Users will have be able to see references to this data structure,
 as this is neccessary so they can pass the reference to the
 .adjust_key() and .delete() methods. However, the data structure
 should be considered opaque. The only public methods are .value()
 and .values().
 Users should not instantiate heap_node on their own.
 In the C implementation, all of this will be enforced.
 """
 __slots__ = ('_item', '_child', '_sibling', '_parents')
 def __init__(self, item):
 self._item = item
 self._clean()
 def value(self):
 "Return the value associated with this node."
 return self._item
 def _clean(self):
 self._child = None
 self._sibling = None
 self._parent = None
 def _add_child(self, node):
 node._parent = self
 node._sibling = self._child
 self._child = node
 def _cut(self):
 "Remove this node and its children from this node's parent."
 n = self._parent
 if id(n._child) == id(self):
 n._child = self._sibling
 else:
 n = n._child
 try:
 while id(n._sibling) != id(self):
 n = n._sibling
 n._sibling = self._sibling
 except AttributeError:
 raise InternalError
 self._parent = None
 self._sibling = None
 def _extract(self, heap):
 "Link this node's children as a prelude to this node's removal"
 # Make the left-to-right pairing pass
 children = []
 n = self._child
 while n and n._sibling:
 next = n._sibling._sibling
 pair = n._sibling
 n._parent = None
 n._sibling = None
 pair._parent = None
 pair._sibling = None
 children.append(heap._link(n, pair))
 n = next
 if n:
 children.append(n)
 if not children:
 return None
 # Repeated comparison-link to the rightmost node
 root = children[-1]
 for i in range(len(children)-2,-1,-1):
 root = heap._link(root, children[i])
 root._parent = None
 # Return the new root
 return root
 def values(self):
 "Return an unsorted sequence of the values of this part of the tree."
 rv = [self._item,]
 if self._child:
 rv += self._child.values()
 if self._sibling:
 rv += self._sibling.values()
 return rv
class pairing_heap:
 "Implements a min-heap using the pairing-heap algorithm."
 def __init__(self, values=None, cmpfunc=None, key=None, reverse=False):
 self._cmpfunc = cmpfunc
 self._key = key
 self._reverse = reverse
 self._root = None
 self._len = 0
 if values is not None:
 for i in values:
 self.insert(i)
 def empty(self):
 "True if there is nothing in the heap."
 return self._root is None
 def peek(self):
 "Return, but do not remove, the top of the heap."
 if self._root:
 return self._root._item
 else:
 raise Underflow
 def __len__(self):
 "Return the number of items in the heap."
 return self._len
 def _cmp(self, item1, item2):
 "Compare two items using the heap's comparison functions."
 if self._key:
 item1 = getattr(item1, self._key)
 item2 = getattr(item2, self._key)
 if self._cmpfunc:
 c = self._cmpfunc(item1, item2)
 else:
 c = cmp(item1, item2)
 if self._reverse:
 c = -c
 return c
 def _link(self, node1, node2):
 "Perform the comparison-link operation."
 if not node1: return node2
 if not node2: return node1
 c = self._cmp(node1._item, node2._item)
 if c > 0:
 node2._add_child (node1)
 return node2
 else:
 node1._add_child (node2)
 return node1
 def insert(self, item):
 "Insert an item into the heap."
 node = heap_node(item)
 self._root = self._link(node, self._root)
 self._len += 1
 return node
 def meld(self, other):
 """Merge another heap into this one.
 other will be an empty heap after completion
 """
 if not isinstance(other, pairing_heap):
 raise TypeError
 # Check that these heaps are compatible
 if self._cmpfunc != other._cmpfunc or self._reverse != other._reverse \
 or self._key != other._key:
 raise IncompatibleHeaps
 self._root = self._link(other._root, self._root)
 self._len += other._len
 # Destroy other
 other._len = 0
 other._root = None
 def adjust_key(self, node, item):
 """Adjust the value of node to item.
 This must be decrease for a min-heap or an increase for a max-heap.
 If node is not part of this heap, both heaps may become gibberish.
 """
 self._check_heap_node (node)
 if self._cmp(node._item, item) < 0:
 raise WrongAdjustKeyDirection
 node._item = item
 if id(self._root) == id(node):
 return
 # Cut this node out of the tree
 node._cut()
 # Link it with the root
 self._root = self._link(node, self._root)
 def delete(self, node):
 """Delete a node from the middle of the heap.
 If node is not part of this heap, both heaps may become gibberish.
 """
 self._check_heap_node (node)
 if id(self._root) == id(node):
 self.extract()
 return
 node._cut()
 self._root = self._link (node._extract(self), self._root)
 self._len -= 1
 node._clean()
 return node._item
 def extract(self, n=0):
 "Extract the top value of the heap."
 if n > 0:
 return [self.extract() for i in xrange(n)]
 if not self._root:
 raise Underflow
 old_root = self._root
 self._root = self._root._extract(self)
 self._len -= 1
 old_root._clean()
 return old_root._item
 def extract_all(self):
 "Empty the heap into a sorted list of all the values"
 return [self.extract() for i in xrange(self._len)]
 def values(self):
 "Return an unsorted list of all the values in the heap"
 return self._root.values()
 def _check_heap_node(self, node):
 """Raise an error if node will cause problems.
 This function is a simple check to catch certain user errors.
 It will raise TypeError if node isn't a heap_node object at
 all. It *may* raise WrongHeap if node isn't part of this heap.
 It will return if node is part of this heap.
 However, no guarantees are made about the return value if node
 is a heap_node, but part of a different heap.
 """
 if not isinstance(node, heap_node):
 raise TypeError
 if self._root == node:
 return
 if not node._parent:
 raise WrongHeap
 return
--- NEW FILE: pairing_heap_test.py ---
#!/usr/bin/python
from pairing_heap import *
import random
import unittest
datasize = 256
class Holder:
 def __init__(self, item):
 self.xyzzy = item
def check_invariant(heap, node=None):
 if node == None:
 node = heap._root
 if node == None:
 return True
 n2 = node._child
 while n2:
 if heap._cmp(node._item, n2._item) >= 0:
 return False
 check_invariant(heap, n2)
 n2 = n2._sibling
 return True
# Uncomment to speed things up when doing code-coverage analysis
##def check_invariant(heap, node=None):
## return True
class TestPairingHeap(unittest.TestCase):
 def setUp(self):
 self.data = [random.random() for i in xrange(datasize)]
 self.data_sorted = sorted(self.data)
 def test_random(self):
 # Push random numbers and pop them off, verifying all's OK.
 heap = pairing_heap()
 self.assert_(check_invariant(heap))
 for item in self.data:
 heap.insert(item)
 self.assert_(check_invariant(heap))
 results = []
 while not heap.empty():
 item = heap.extract()
 self.assert_(check_invariant(heap))
 results.append(item)
 self.assertEqual(self.data_sorted, results)
 def test_naive_nbest(self):
 heap = pairing_heap()
 for item in self.data:
 heap.insert(item)
 if len(heap) > 10:
 heap.extract()
 result = heap.extract_all()
 self.assertEqual(result, self.data_sorted[-10:])
 def test_underflow(self):
 heap = pairing_heap()
 self.assertRaises(Underflow, heap.extract)
 self.assertRaises(Underflow, heap.peek)
 def test_return_unsorted(self):
 heap = pairing_heap()
 for item in self.data:
 heap.insert(item)
 self.assertEqual(set(heap.values()), set(self.data))
 def test_implicit_creation_and_peek(self):
 heap = pairing_heap(self.data)
 self.assert_(check_invariant(heap))
 self.assertEqual(heap.peek(), self.data_sorted[0])
 self.assert_(check_invariant(heap))
 result = heap.extract_all()
 self.assertEqual(result, self.data_sorted)
 def test_extract_with_arg(self):
 heap = pairing_heap(self.data)
 low10 = heap.extract(10)
 self.assert_(check_invariant(heap))
 self.assertEqual(low10, self.data_sorted[:10])
 def test_deleting_from_the_middle(self):
 heap = pairing_heap()
 nodes = [heap.insert(item) for item in self.data]
 data2 = self.data[:]
 count = 0
 for i in xrange(datasize-1,-1,-5):
 heap.delete(nodes[i])
 del nodes[i]
 self.assert_(check_invariant(heap))
 count += 1
 self.assertEqual(datasize-count, len(heap))
 del data2[i]
 result = heap.extract_all()
 data2_sorted = data2[:]
 data2_sorted.sort()
 self.assertEqual(data2_sorted, result)
 def test_deleting_everything_in_random_order(self):
 heap = pairing_heap()
 nodes = [heap.insert(item) for item in self.data]
 for i in xrange(datasize):
 t = random.randrange(len(heap))
 heap.delete(nodes[t])
 del nodes[t]
 self.assert_(check_invariant(heap))
 self.assert_(heap.empty())
 def test_adjust_key(self):
 heap = pairing_heap()
 nodes = [heap.insert(item) for item in self.data]
 data2 = []
 for i in xrange(datasize):
 data2.append(min(self.data[i], random.random()))
 idx = range(datasize)
 random.shuffle(idx)
 for i in idx:
 heap.adjust_key(nodes[i], data2[i])
 self.assert_(check_invariant(heap))
 self.assertEqual(sorted(data2), heap.extract_all())
 def test_build_tree_by_melding(self):
 heap = pairing_heap()
 i = 0
 while i < datasize:
 j = max(random.randrange(min(datasize - i, 5)), 1)
 heap2 = pairing_heap(self.data[i:i+j])
 i += j
 heap.meld(heap2)
 self.assert_(check_invariant(heap))
 self.assertEqual(self.data_sorted, heap.extract_all())
 def test_cmpfunc(self):
 reverse = lambda x, y: -cmp(x,y)
 heap = pairing_heap(self.data, cmpfunc=reverse)
 self.assertEqual(sorted(self.data, reverse=True), heap.extract_all())
 def test_reverse(self):
 heap = pairing_heap(self.data, reverse=True)
 self.assertEqual(sorted(self.data, reverse=True), heap.extract_all())
 def test_cmpfunc_with_reverse(self):
 reverse = lambda x, y: -cmp(x,y)
 heap = pairing_heap(self.data, cmpfunc=reverse, reverse=True)
 self.assertEqual(self.data_sorted, heap.extract_all())
 def test_key(self):
 holders = [Holder(item) for item in self.data]
 heap = pairing_heap(holders, key='xyzzy')
 results = heap.extract_all()
 self.assertEqual(self.data_sorted,[item.xyzzy for item in results])
 def test_extract_values_from_nodes(self):
 heap = pairing_heap()
 nodes = [heap.insert(datum) for datum in self.data]
 for i in xrange(len(nodes)):
 self.assertEqual(nodes[i].value(), self.data[i])
 self.assert_(check_invariant(heap))
 def test_meld_with_incompatible_heap(self):
 heap1 = pairing_heap()
 reverse = lambda x, y: -cmp(x,y)
 self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(cmpfunc = reverse))
 self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(reverse = True))
 self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(key = 'xyzzy'))
 def test_adj_key_wrong_direction(self):
 heap = pairing_heap ()
 nodes = [heap.insert(datum) for datum in range(5)]
 self.assertRaises(WrongAdjustKeyDirection, heap.adjust_key, nodes[4], 100)
 def test_adjust_key_on_existing_root(self):
 heap = pairing_heap ()
 data = self.data
 nodes = [heap.insert(datum) for datum in data]
 low = 0
 for i in xrange(1, len(data)):
 if data[i] == data[low]:
 low_list.append (i)
 if data[i] < data[low]:
 low = i
 low_list = [i]
 for i in low_list:
 heap.adjust_key(nodes[i], data[i] - random.random())
 def test_bogus_node_parameter(self):
 heap = pairing_heap ()
 nodes = [heap.insert(datum) for datum in self.data]
 self.assertRaises(TypeError, heap.adjust_key, "muhahahah!", 5)
 self.assert_(check_invariant(heap))
 self.assertRaises(TypeError, heap.delete, "muhahahah!")
 def test_semibogus_node_parameter(self):
 holders = [Holder(item) for item in self.data]
 heap = pairing_heap(key='xyzzy')
 nodes = [heap.insert(datum) for datum in holders]
 for i in xrange(len(holders)):
 holders[i].node = nodes[i]
 top = heap.extract ()
 self.assertRaises(WrongHeap, heap.delete, top.node)
 self.assert_(check_invariant(heap))
if __name__ == '__main__':
 suite = unittest.TestSuite()
 suite.addTest(unittest.makeSuite(TestPairingHeap))
 unittest.TextTestRunner(verbosity=2).run(suite)


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /