It has been a while since taking the class Algorithms and Data Structures, so the following module was written as an exercise for implementing Python's collections.deque
class in pure Python without looking at its code in C. The Deque
class appears to pass all tests within the test.test_deque
module, but a lot of patches were made to the code before that became possible. As a result, the code is a mess, and there are many specific errors and style problems throughout the program. Fortunately, my interests are not in specific details.
Are there any glaring problems in the overall architecture of the Deque
class or any of its supporting classes? Supporting examples of the problems are welcome as are suggestions on improving the overall design of the code.
import copy
import functools
import gc
import itertools
import math
import operator
import test.support
import test.test_deque
import threading
import unittest
__all__ = ['Deque']
def main():
test.test_deque.deque = Deque
test.test_deque.TestBasic.test_sizeof = test.support.anticipate_failure(
True
)(test.test_deque.TestBasic.test_sizeof)
test.test_deque.TestSubclass.assertRaises = lambda *args, **kwargs: (
gc.collect(),
unittest.TestCase.assertRaises(*args, **kwargs)
)[1]
test.test_deque.test_main(True)
class AtomicMeta(type):
def __new__(mcs, name, bases, class_dict):
for key, value in class_dict.items():
if callable(value):
class_dict[key] = mcs.wrap(value)
# if '__slots__' not in class_dict:
# class_dict['__slots__'] = ('_AtomicMeta__mutex',)
# else:
# class_dict['__slots__'] += ('_AtomicMeta__mutex',)
class_dict['__new__'] = mcs.atomic_new
return super().__new__(mcs, name, bases, class_dict)
@staticmethod
def wrap(func):
@functools.wraps(func)
def atomic(self, *args, **kwargs):
try:
mutex = self.__mutex
except AttributeError:
mutex = self.__mutex = threading.RLock()
with mutex:
return func(self, *args, **kwargs)
return atomic
@staticmethod
def atomic_new(cls, *args, **kwargs):
instance = object.__new__(cls)
instance.__mutex = threading.RLock()
return instance
class Deque(metaclass=AtomicMeta):
# __slots__ = (
# '__max_len',
# '_left_root',
# '_right_root',
# '__len',
# '_guard',
# '__repr',
# '_version'
# )
def __init__(self, iterable=None, maxlen=None):
if maxlen is not None:
if not isinstance(maxlen, int):
raise TypeError('max_len must be None or int')
if maxlen < 0:
raise ValueError('max_len must be >= zero')
self.__max_len = maxlen
else:
self.__max_len = math.inf
self._left_root = self._right_root = None
self.__len = 0
self._guard = IterationGuard(False)
self.__repr = False
self._version = 0
if iterable is not None:
self.extend(iterable)
def append(self, x):
with self._guard.mute:
if self._right_root is None:
self._left_root = self._right_root = Node(x, None, None, self)
else:
self._right_root.append(x)
self.__len += 1
if self.__len > self.__max_len:
self.popleft()
self._version += 1
def append_left(self, x):
with self._guard.mute:
if self._left_root is None:
self._left_root = self._right_root = Node(x, None, None, self)
else:
self._left_root.append_left(x)
self.__len += 1
if self.__len > self.__max_len:
self.pop()
self._version += 1
appendleft = append_left
def clear(self):
with self._guard.mute:
self._left_root = self._right_root = None
self.__len = 0
self._version += 1
def copy(self):
with self._guard.iter:
version = self._version
result = type(self)(self, self.max_len)
if self._version != version:
raise RuntimeError()
return result
def count(self, x):
with self._guard.iter:
version = self._version
node, count = self._left_root, 0
while node is not None:
count += node.value == x
node = node.right
if self._version != version:
raise RuntimeError()
return count
def extend(self, iterable):
for item in (tuple(self) if iterable is self else iterable):
self.append(item)
def extend_left(self, iterable):
for item in (tuple(self) if iterable is self else iterable):
self.append_left(item)
extendleft = extend_left
def index(self, x, start=None, stop=None):
start = 0 if start is None else self.__get_index(start, False)
stop = len(self) if stop is None else self.__get_index(stop, False)
with self._guard.iter:
version = self._version
node = self._left_root
for index in itertools.count():
if self._version != version:
raise RuntimeError()
if node is None:
raise ValueError('item could not be found')
if node.value == x and start <= index < stop:
break
node = node.right
if self._version != version:
raise RuntimeError()
return index
def insert(self, i, x):
if len(self) == self.max_len:
raise IndexError('cannot grow anymore')
index = self.__get_index(i, False)
if index >= len(self):
self.append(x)
else:
if index < 0:
index = 0
node = self.__get_node(index)
with self._guard.mute:
if node.left is None:
self._left_root = node.left = Node(x, None, node, self)
else:
node.left.right = node.left = Node(
x, node.left, node, self
)
self.__len += 1
self._version += 1
def pop(self):
if self._right_root is None:
raise IndexError('pop from an empty deque')
with self._guard.mute:
value = self._right_root.value
self._right_root = self._right_root.left
if self._right_root is None:
self._left_root = None
else:
self._right_root.right = None
self.__len -= 1
self._version += 1
return value
def popleft(self):
if self._left_root is None:
raise IndexError('pop from an empty deque')
with self._guard.mute:
value = self._left_root.value
self._left_root = self._left_root.right
if self._left_root is None:
self._right_root = None
else:
self._left_root.left = None
self.__len -= 1
self._version += 1
return value
def remove(self, value):
node = self._left_root
version = self._version
while node is not None:
if node.value == value:
if self._version != version:
raise IndexError()
break
else:
node = node.right
else:
raise ValueError('value could not be found')
self.__del_node(node)
def reverse(self):
if self._left_root is not None:
with self._guard.mute:
self._left_root, self._right_root = \
self._right_root, self._left_root
node = self._left_root
while node is not None:
node.left, node.right = node.right, node.left
node = node.right
self._version += 1
def rotate(self, n=1):
size = len(self)
if size > 0:
with self._guard.mute:
for _ in range(n % size):
self.append_left(self.pop())
self._version += 1
# Support for the following methods are left as an exercise for the reader.
@property
def max_len(self):
return None if self.__max_len == math.inf else self.__max_len
maxlen = max_len
def __iter__(self):
return DequeIter(self)
def __getstate__(self):
return (
self.__max_len,
self._left_root,
self._right_root,
self.__len,
self._guard,
self.__repr,
self._version
)
def __setstate__(self, state):
self.__max_len, self._left_root, self._right_root, self.__len, \
self._guard, self.__repr, self._version = state
def __len__(self):
return self.__len
def __reversed__(self):
return ReverseIter(self)
def __copy__(self):
return self.copy()
def __deepcopy__(self, memo):
return type(self)(map(lambda item: copy.deepcopy(item, memo), self))
def __contains__(self, item):
with self._guard.iter:
version = self._version
node = self._left_root
while node is not None:
if node.value == item:
return True
node = node.right
if self._version != version:
raise RuntimeError()
return False
def __getitem__(self, key):
index = self.__get_index(key)
return self.__get_node(index).value
def __setitem__(self, key, value):
index = self.__get_index(key)
node = self.__get_node(index)
node.value = value
def __delitem__(self, key):
index = self.__get_index(key)
node = self.__get_node(index)
with self._guard.mute:
self.__del_node(node)
self._version += 1
def __get_index(self, key, check=True):
if key < 0:
key = len(self) + key
if check and (key < 0 or key >= len(self)):
raise IndexError('index is out of range')
return key
def __get_node(self, index):
with self._guard.iter:
version = self._version
halfway = len(self) >> 1
if index < halfway:
node = self._left_root
for _ in range(index):
node = node.right
return node
node = self._right_root
for _ in range(len(self) - index - 1):
node = node.left
if self._version != version:
raise RuntimeError()
return node
def __del_node(self, node):
with self._guard.mute:
if node.left is None:
self._left_root = node.right
else:
node.left.right = node.right
if node.right is None:
self._right_root = node.left
else:
node.right.left = node.left
self.__len -= 1
self._version += 1
# The following methods are for extra credit.
def __add__(self, other):
if not isinstance(other, type(self)):
raise TypeError(f'other must be of type {type(self).__name__}')
total = self.copy()
total.extend(other)
return total
def __mul__(self, other):
if other < 1:
return type(self)()
product = self.copy()
for _ in range(other - 1):
product.extend(self)
return product
__rmul__ = __mul__
def __imul__(self, other):
if other < 1:
self.clear()
if other > 1:
state = tuple(self)
for _ in range(other - 1):
self.extend(state)
return self
# This should be helpful while trying to debug the code.
def __repr__(self):
if self.__repr:
return '[...]'
self.__repr = True
ml = "" if self.maxlen is None else ", maxlen=" + str(self.maxlen)
representation = f'{type(self).__name__.lower()}({list(self)}{ml})'
self.__repr = False
return representation
# Testing requires a few more methods.
def __lt__(self, other):
for a, b in zip(self, other):
if a < b:
return True
if a > b:
return False
return True
def __le__(self, other):
for a, b in zip(self, other):
if a < b:
return True
if a > b:
return False
return True
def __eq__(self, other):
return type(self) == type(other) and \
len(self) == len(other) and \
all(itertools.starmap(operator.eq, zip(self, other)))
def __ne__(self, other):
return type(self) != type(other) or \
len(self) != len(other) or \
any(itertools.starmap(operator.ne, zip(self, other)))
def __gt__(self, other):
for a, b in zip(self, other):
if a < b:
return False
if a > b:
return True
return False
def __ge__(self, other):
for a, b in zip(self, other):
if a < b:
return False
if a > b:
return True
return False
def __iadd__(self, other):
self.extend(other)
return self
deque = Deque
class Node:
__slots__ = 'value', 'left', 'right', 'parent'
def __init__(self, value, left, right, parent):
self.value = value
self.left = left
self.right = right
self.parent = parent
def __repr__(self):
return f'{type(self).__name__}(' \
f'{self.value!r}, ' \
f'{None if self.left is None else self.left.value!r}, ' \
f'{None if self.right is None else self.right.value!r}, parent)'
def append(self, x):
new_node = type(self)(x, self, None, self.parent)
self.right = new_node
self.parent._right_root = new_node
def append_left(self, x):
new_node = type(self)(x, None, self, self.parent)
self.left = new_node
self.parent._left_root = new_node
def __getstate__(self):
return self.value, self.left, self.right, self.parent
def __setstate__(self, state):
self.value, self.left, self.right, self.parent = state
class DequeIter:
__slots__ = 'parent', 'node', 'guard', 'version'
def __init__(self, parent):
self.parent = parent
self.node = parent._left_root
self.guard = parent._guard.iter
self.version = parent._version
if self.node is not None:
self.guard.__enter__()
def __iter__(self):
return self
def __next__(self):
if self.parent._version != self.version:
raise RuntimeError()
if self.node is None:
raise StopIteration()
node = self.node
self.node = node.right
if self.node is None:
self.guard.__exit__(None, None, None)
return node.value
def __getstate__(self):
return self.parent, self.node, self.guard, self.version
def __setstate__(self, state):
self.parent, self.node, self.guard, self.version = state
class ReverseIter(DequeIter):
__slots__ = ()
def __init__(self, parent):
parent = parent.copy()
parent.reverse()
super().__init__(parent)
class IterationGuard:
__slots__ = '__iter', '__mute'
def __init__(self, debug):
self.__iter = IGAttr('iter', debug)
self.__mute = IGAttr('mute', debug)
self.__iter.other = self.__mute
self.__mute.other = self.__iter
@property
def iter(self):
return self.__iter
@property
def mute(self):
return self.__mute
def __getstate__(self):
return self.__iter, self.__mute
def __setstate__(self, state):
self.__iter, self.__mute = state
class IGAttr:
__slots__ = 'name', 'debug', 'other', 'level'
def __init__(self, name, debug):
self.name = name
self.debug = debug
self.other = None
self.level = 0
def __enter__(self):
if self.other.level > 0 and self.other.name != 'iter':
raise RuntimeError(f'{self.name} is not allowed right now')
self.level += 1
if self.debug:
print(f'{self.name} = {self.level}')
def __exit__(self, exc_type, exc_value, traceback):
self.level -= 1
if self.debug:
print(f'{self.name} = {self.level}')
def __getstate__(self):
return self.name, self.debug, self.other, self.level
def __setstate__(self, state):
self.name, self.debug, self.other, self.level = state
class TestHelp:
@staticmethod
def assertEqual(first, second, msg=None):
assert first == second, msg
@staticmethod
def assertRaises(exception, callable=None, *args, **kwargs):
if callable is None:
return ExceptionChecker(exception)
try:
callable(*args, **kwargs)
except exception:
pass
except:
assert False
else:
assert False
class ExceptionChecker:
def __init__(self, exception):
self.exc = exception
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
return isinstance(exc_val, self.exc)
if __name__ == '__main__':
main()
3 Answers 3
Just reviewing the AtomicMeta
class.
1. Review
There's no docstring. What is the class supposed to do?
There are two mechanisms for initializing the
__mutex
attribute: one in theatomic_new
method, and another in thewrap
method. Why two mechanisms? Is one of them somehow unreliable? But if it is, then why not just use the other? I don't understand this, so have I missed a requirement?AtomicMeta
does not support classes that have__slots__
:>>> class Test1(metaclass=AtomicMeta): ... __slots__ = ['slot'] ... >>> Test1() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "cr178957.py", line 31, in atomic_new instance.__mutex = threading.RLock() AttributeError: 'Class1' object has no attribute '_AtomicMeta__mutex'
To fix this, the
__new__
method needs:if '__slots__' in class_dict: class_dict['__slots__'] += ('_AtomicMeta__mutex',)
AtomicMeta
does not support classes that have a__new__
method:>>> class Test2(metaclass=AtomicMeta): ... def __new__(cls, value): ... instance = super().__new__(cls) ... instance.attr = value ... return instance ... >>> Test2('value').attr Traceback (most recent call last): File "<stdin>", line 1, in <module> AttributeError: 'Test2' object has no attribute 'attr'
The simplest approach for supporting this would seem to be to drop the
atomic_new
method (relying on thewrap
method to initialize the__mutex
attribute, as discussed above), and ensure that the__new__
method does not get wrapped:if key != '__new__' and callable(value): class_dict[key] = mcs.wrap(value)
The
__init__
method also does not need to be wrapped, since at the point where__init__
is called, only one thread has access to the object. So this test can become:if key not in ('__init__', '__new__') and callable(value): class_dict[key] = mcs.wrap(value)
Since
wrap
is called from only one place, it would be simpler to inline it at the point of use.
2. Revised code
import functools
import threading
class AtomicMeta(type):
"Metaclass that serializes access to the methods of each instance."
def __new__(mcs, name, bases, class_dict):
for key, value in class_dict.items():
if key not in ('__init__', '__new__') and callable(value):
@functools.wraps(value)
def atomic(self, *args, **kwargs):
try:
mutex = self._AtomicMeta__mutex
except AttributeError:
mutex = self._AtomicMeta__mutex = threading.RLock()
with mutex:
return value(self, *args, **kwargs)
class_dict[key] = atomic
if '__slots__' in class_dict:
class_dict['__slots__'] += ['_AtomicMeta__mutex']
return super().__new__(mcs, name, bases, class_dict)
The biggest problem I see with this whole thing, and the reason your code hasn't gotten much attention, is the lack of comments. In particular comments about why you're doing something a certain way, or what the purpose (goal) of a section of code is. This makes it cumbersome for someone looking over the code to give the kind of high-level analysis you're asking for. Putting some docstrings in (even though the interface is the same as a standard library) would also help out people like me who sometimes forget the difference between append
and extend
.
- What's the reason for maintaining
version
in the deque itself? I don't undestand theAtomic
stuff, I guess that's supposed to keep it threadsafe, but shouldn't that take care of things without needingversion
? You really need some comments here. - Is there any way to test if the atomicity actualy works?
- Is an unexplained
RuntimeError
the best exception to raise if you detect a failure of atomicity usingversion
? - The exceptions for
__init__()
refer tomax_len
but the named parameter ismaxlen
- Using a temporary
tuple
when duplicating the dequeue inextend()
is quick to write but inefficient. Shouldn't it be possible to keep track using pointers? - In
index()
, I'm not sure if re-usingstart
andstop
that way is easy enough to understand. I don't think it hurts anything to create a separate variable, and name them in keeping with their actual content - Since the
start <= index < stop
test will almost always be quicker than the equality test, shouldn't it be first? - Does the reference implementation actually require a
IndexError
exception if you try to add to a full deque? That seems counterintuitive. - Isn't the last half of
insert
the same asappend_left
? - In the functions that come in left/right pairs, isn't there some way to combine them so the code is not duplicated?
- I'm pretty sure there's a better implementation of
rotate()
that only requires patching four pointers. - Is the "exercise for the reader" comment supposed to be there?
I guess I'd add an alias for "the other side" everywhere a method name is decorated left or right, the way the library grew a bisect_right()
.
For reverse()
, you could swap accessors (including, depending on implementation details, iterators) instead of references - better yet, create a class with swapped accessors and mutate the object on reverse()
.
What about __str__()
?
Explore related questions
See similar questions with these tags.