5
\$\begingroup\$

It has been a while since taking the class Algorithms and Data Structures, so the following module was written as an exercise for implementing Python's collections.deque class in pure Python without looking at its code in C. The Deque class appears to pass all tests within the test.test_deque module, but a lot of patches were made to the code before that became possible. As a result, the code is a mess, and there are many specific errors and style problems throughout the program. Fortunately, my interests are not in specific details.

Are there any glaring problems in the overall architecture of the Deque class or any of its supporting classes? Supporting examples of the problems are welcome as are suggestions on improving the overall design of the code.

import copy
import functools
import gc
import itertools
import math
import operator
import test.support
import test.test_deque
import threading
import unittest
__all__ = ['Deque']
def main():
 test.test_deque.deque = Deque
 test.test_deque.TestBasic.test_sizeof = test.support.anticipate_failure(
 True
 )(test.test_deque.TestBasic.test_sizeof)
 test.test_deque.TestSubclass.assertRaises = lambda *args, **kwargs: (
 gc.collect(),
 unittest.TestCase.assertRaises(*args, **kwargs)
 )[1]
 test.test_deque.test_main(True)
class AtomicMeta(type):
 def __new__(mcs, name, bases, class_dict):
 for key, value in class_dict.items():
 if callable(value):
 class_dict[key] = mcs.wrap(value)
 # if '__slots__' not in class_dict:
 # class_dict['__slots__'] = ('_AtomicMeta__mutex',)
 # else:
 # class_dict['__slots__'] += ('_AtomicMeta__mutex',)
 class_dict['__new__'] = mcs.atomic_new
 return super().__new__(mcs, name, bases, class_dict)
 @staticmethod
 def wrap(func):
 @functools.wraps(func)
 def atomic(self, *args, **kwargs):
 try:
 mutex = self.__mutex
 except AttributeError:
 mutex = self.__mutex = threading.RLock()
 with mutex:
 return func(self, *args, **kwargs)
 return atomic
 @staticmethod
 def atomic_new(cls, *args, **kwargs):
 instance = object.__new__(cls)
 instance.__mutex = threading.RLock()
 return instance
class Deque(metaclass=AtomicMeta):
 # __slots__ = (
 # '__max_len',
 # '_left_root',
 # '_right_root',
 # '__len',
 # '_guard',
 # '__repr',
 # '_version'
 # )
 def __init__(self, iterable=None, maxlen=None):
 if maxlen is not None:
 if not isinstance(maxlen, int):
 raise TypeError('max_len must be None or int')
 if maxlen < 0:
 raise ValueError('max_len must be >= zero')
 self.__max_len = maxlen
 else:
 self.__max_len = math.inf
 self._left_root = self._right_root = None
 self.__len = 0
 self._guard = IterationGuard(False)
 self.__repr = False
 self._version = 0
 if iterable is not None:
 self.extend(iterable)
 def append(self, x):
 with self._guard.mute:
 if self._right_root is None:
 self._left_root = self._right_root = Node(x, None, None, self)
 else:
 self._right_root.append(x)
 self.__len += 1
 if self.__len > self.__max_len:
 self.popleft()
 self._version += 1
 def append_left(self, x):
 with self._guard.mute:
 if self._left_root is None:
 self._left_root = self._right_root = Node(x, None, None, self)
 else:
 self._left_root.append_left(x)
 self.__len += 1
 if self.__len > self.__max_len:
 self.pop()
 self._version += 1
 appendleft = append_left
 def clear(self):
 with self._guard.mute:
 self._left_root = self._right_root = None
 self.__len = 0
 self._version += 1
 def copy(self):
 with self._guard.iter:
 version = self._version
 result = type(self)(self, self.max_len)
 if self._version != version:
 raise RuntimeError()
 return result
 def count(self, x):
 with self._guard.iter:
 version = self._version
 node, count = self._left_root, 0
 while node is not None:
 count += node.value == x
 node = node.right
 if self._version != version:
 raise RuntimeError()
 return count
 def extend(self, iterable):
 for item in (tuple(self) if iterable is self else iterable):
 self.append(item)
 def extend_left(self, iterable):
 for item in (tuple(self) if iterable is self else iterable):
 self.append_left(item)
 extendleft = extend_left
 def index(self, x, start=None, stop=None):
 start = 0 if start is None else self.__get_index(start, False)
 stop = len(self) if stop is None else self.__get_index(stop, False)
 with self._guard.iter:
 version = self._version
 node = self._left_root
 for index in itertools.count():
 if self._version != version:
 raise RuntimeError()
 if node is None:
 raise ValueError('item could not be found')
 if node.value == x and start <= index < stop:
 break
 node = node.right
 if self._version != version:
 raise RuntimeError()
 return index
 def insert(self, i, x):
 if len(self) == self.max_len:
 raise IndexError('cannot grow anymore')
 index = self.__get_index(i, False)
 if index >= len(self):
 self.append(x)
 else:
 if index < 0:
 index = 0
 node = self.__get_node(index)
 with self._guard.mute:
 if node.left is None:
 self._left_root = node.left = Node(x, None, node, self)
 else:
 node.left.right = node.left = Node(
 x, node.left, node, self
 )
 self.__len += 1
 self._version += 1
 def pop(self):
 if self._right_root is None:
 raise IndexError('pop from an empty deque')
 with self._guard.mute:
 value = self._right_root.value
 self._right_root = self._right_root.left
 if self._right_root is None:
 self._left_root = None
 else:
 self._right_root.right = None
 self.__len -= 1
 self._version += 1
 return value
 def popleft(self):
 if self._left_root is None:
 raise IndexError('pop from an empty deque')
 with self._guard.mute:
 value = self._left_root.value
 self._left_root = self._left_root.right
 if self._left_root is None:
 self._right_root = None
 else:
 self._left_root.left = None
 self.__len -= 1
 self._version += 1
 return value
 def remove(self, value):
 node = self._left_root
 version = self._version
 while node is not None:
 if node.value == value:
 if self._version != version:
 raise IndexError()
 break
 else:
 node = node.right
 else:
 raise ValueError('value could not be found')
 self.__del_node(node)
 def reverse(self):
 if self._left_root is not None:
 with self._guard.mute:
 self._left_root, self._right_root = \
 self._right_root, self._left_root
 node = self._left_root
 while node is not None:
 node.left, node.right = node.right, node.left
 node = node.right
 self._version += 1
 def rotate(self, n=1):
 size = len(self)
 if size > 0:
 with self._guard.mute:
 for _ in range(n % size):
 self.append_left(self.pop())
 self._version += 1
 # Support for the following methods are left as an exercise for the reader.
 @property
 def max_len(self):
 return None if self.__max_len == math.inf else self.__max_len
 maxlen = max_len
 def __iter__(self):
 return DequeIter(self)
 def __getstate__(self):
 return (
 self.__max_len,
 self._left_root,
 self._right_root,
 self.__len,
 self._guard,
 self.__repr,
 self._version
 )
 def __setstate__(self, state):
 self.__max_len, self._left_root, self._right_root, self.__len, \
 self._guard, self.__repr, self._version = state
 def __len__(self):
 return self.__len
 def __reversed__(self):
 return ReverseIter(self)
 def __copy__(self):
 return self.copy()
 def __deepcopy__(self, memo):
 return type(self)(map(lambda item: copy.deepcopy(item, memo), self))
 def __contains__(self, item):
 with self._guard.iter:
 version = self._version
 node = self._left_root
 while node is not None:
 if node.value == item:
 return True
 node = node.right
 if self._version != version:
 raise RuntimeError()
 return False
 def __getitem__(self, key):
 index = self.__get_index(key)
 return self.__get_node(index).value
 def __setitem__(self, key, value):
 index = self.__get_index(key)
 node = self.__get_node(index)
 node.value = value
 def __delitem__(self, key):
 index = self.__get_index(key)
 node = self.__get_node(index)
 with self._guard.mute:
 self.__del_node(node)
 self._version += 1
 def __get_index(self, key, check=True):
 if key < 0:
 key = len(self) + key
 if check and (key < 0 or key >= len(self)):
 raise IndexError('index is out of range')
 return key
 def __get_node(self, index):
 with self._guard.iter:
 version = self._version
 halfway = len(self) >> 1
 if index < halfway:
 node = self._left_root
 for _ in range(index):
 node = node.right
 return node
 node = self._right_root
 for _ in range(len(self) - index - 1):
 node = node.left
 if self._version != version:
 raise RuntimeError()
 return node
 def __del_node(self, node):
 with self._guard.mute:
 if node.left is None:
 self._left_root = node.right
 else:
 node.left.right = node.right
 if node.right is None:
 self._right_root = node.left
 else:
 node.right.left = node.left
 self.__len -= 1
 self._version += 1
 # The following methods are for extra credit.
 def __add__(self, other):
 if not isinstance(other, type(self)):
 raise TypeError(f'other must be of type {type(self).__name__}')
 total = self.copy()
 total.extend(other)
 return total
 def __mul__(self, other):
 if other < 1:
 return type(self)()
 product = self.copy()
 for _ in range(other - 1):
 product.extend(self)
 return product
 __rmul__ = __mul__
 def __imul__(self, other):
 if other < 1:
 self.clear()
 if other > 1:
 state = tuple(self)
 for _ in range(other - 1):
 self.extend(state)
 return self
 # This should be helpful while trying to debug the code.
 def __repr__(self):
 if self.__repr:
 return '[...]'
 self.__repr = True
 ml = "" if self.maxlen is None else ", maxlen=" + str(self.maxlen)
 representation = f'{type(self).__name__.lower()}({list(self)}{ml})'
 self.__repr = False
 return representation
 # Testing requires a few more methods.
 def __lt__(self, other):
 for a, b in zip(self, other):
 if a < b:
 return True
 if a > b:
 return False
 return True
 def __le__(self, other):
 for a, b in zip(self, other):
 if a < b:
 return True
 if a > b:
 return False
 return True
 def __eq__(self, other):
 return type(self) == type(other) and \
 len(self) == len(other) and \
 all(itertools.starmap(operator.eq, zip(self, other)))
 def __ne__(self, other):
 return type(self) != type(other) or \
 len(self) != len(other) or \
 any(itertools.starmap(operator.ne, zip(self, other)))
 def __gt__(self, other):
 for a, b in zip(self, other):
 if a < b:
 return False
 if a > b:
 return True
 return False
 def __ge__(self, other):
 for a, b in zip(self, other):
 if a < b:
 return False
 if a > b:
 return True
 return False
 def __iadd__(self, other):
 self.extend(other)
 return self
deque = Deque
class Node:
 __slots__ = 'value', 'left', 'right', 'parent'
 def __init__(self, value, left, right, parent):
 self.value = value
 self.left = left
 self.right = right
 self.parent = parent
 def __repr__(self):
 return f'{type(self).__name__}(' \
 f'{self.value!r}, ' \
 f'{None if self.left is None else self.left.value!r}, ' \
 f'{None if self.right is None else self.right.value!r}, parent)'
 def append(self, x):
 new_node = type(self)(x, self, None, self.parent)
 self.right = new_node
 self.parent._right_root = new_node
 def append_left(self, x):
 new_node = type(self)(x, None, self, self.parent)
 self.left = new_node
 self.parent._left_root = new_node
 def __getstate__(self):
 return self.value, self.left, self.right, self.parent
 def __setstate__(self, state):
 self.value, self.left, self.right, self.parent = state
class DequeIter:
 __slots__ = 'parent', 'node', 'guard', 'version'
 def __init__(self, parent):
 self.parent = parent
 self.node = parent._left_root
 self.guard = parent._guard.iter
 self.version = parent._version
 if self.node is not None:
 self.guard.__enter__()
 def __iter__(self):
 return self
 def __next__(self):
 if self.parent._version != self.version:
 raise RuntimeError()
 if self.node is None:
 raise StopIteration()
 node = self.node
 self.node = node.right
 if self.node is None:
 self.guard.__exit__(None, None, None)
 return node.value
 def __getstate__(self):
 return self.parent, self.node, self.guard, self.version
 def __setstate__(self, state):
 self.parent, self.node, self.guard, self.version = state
class ReverseIter(DequeIter):
 __slots__ = ()
 def __init__(self, parent):
 parent = parent.copy()
 parent.reverse()
 super().__init__(parent)
class IterationGuard:
 __slots__ = '__iter', '__mute'
 def __init__(self, debug):
 self.__iter = IGAttr('iter', debug)
 self.__mute = IGAttr('mute', debug)
 self.__iter.other = self.__mute
 self.__mute.other = self.__iter
 @property
 def iter(self):
 return self.__iter
 @property
 def mute(self):
 return self.__mute
 def __getstate__(self):
 return self.__iter, self.__mute
 def __setstate__(self, state):
 self.__iter, self.__mute = state
class IGAttr:
 __slots__ = 'name', 'debug', 'other', 'level'
 def __init__(self, name, debug):
 self.name = name
 self.debug = debug
 self.other = None
 self.level = 0
 def __enter__(self):
 if self.other.level > 0 and self.other.name != 'iter':
 raise RuntimeError(f'{self.name} is not allowed right now')
 self.level += 1
 if self.debug:
 print(f'{self.name} = {self.level}')
 def __exit__(self, exc_type, exc_value, traceback):
 self.level -= 1
 if self.debug:
 print(f'{self.name} = {self.level}')
 def __getstate__(self):
 return self.name, self.debug, self.other, self.level
 def __setstate__(self, state):
 self.name, self.debug, self.other, self.level = state
class TestHelp:
 @staticmethod
 def assertEqual(first, second, msg=None):
 assert first == second, msg
 @staticmethod
 def assertRaises(exception, callable=None, *args, **kwargs):
 if callable is None:
 return ExceptionChecker(exception)
 try:
 callable(*args, **kwargs)
 except exception:
 pass
 except:
 assert False
 else:
 assert False
class ExceptionChecker:
 def __init__(self, exception):
 self.exc = exception
 def __enter__(self):
 pass
 def __exit__(self, exc_type, exc_val, exc_tb):
 return isinstance(exc_val, self.exc)
if __name__ == '__main__':
 main()
200_success
145k22 gold badges190 silver badges478 bronze badges
asked Oct 27, 2017 at 14:48
\$\endgroup\$
0

3 Answers 3

2
\$\begingroup\$

Just reviewing the AtomicMeta class.

1. Review

  1. There's no docstring. What is the class supposed to do?

  2. There are two mechanisms for initializing the __mutex attribute: one in the atomic_new method, and another in the wrap method. Why two mechanisms? Is one of them somehow unreliable? But if it is, then why not just use the other? I don't understand this, so have I missed a requirement?

  3. AtomicMeta does not support classes that have __slots__:

    >>> class Test1(metaclass=AtomicMeta):
    ... __slots__ = ['slot']
    ...
    >>> Test1()
    Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File "cr178957.py", line 31, in atomic_new
     instance.__mutex = threading.RLock()
    AttributeError: 'Class1' object has no attribute '_AtomicMeta__mutex'
    

    To fix this, the __new__ method needs:

    if '__slots__' in class_dict:
     class_dict['__slots__'] += ('_AtomicMeta__mutex',)
    
  4. AtomicMeta does not support classes that have a __new__ method:

    >>> class Test2(metaclass=AtomicMeta):
    ... def __new__(cls, value):
    ... instance = super().__new__(cls)
    ... instance.attr = value
    ... return instance
    ...
    >>> Test2('value').attr
    Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
    AttributeError: 'Test2' object has no attribute 'attr'
    

    The simplest approach for supporting this would seem to be to drop the atomic_new method (relying on the wrap method to initialize the __mutex attribute, as discussed above), and ensure that the __new__ method does not get wrapped:

    if key != '__new__' and callable(value):
     class_dict[key] = mcs.wrap(value)
    
  5. The __init__ method also does not need to be wrapped, since at the point where __init__ is called, only one thread has access to the object. So this test can become:

    if key not in ('__init__', '__new__') and callable(value):
     class_dict[key] = mcs.wrap(value)
    
  6. Since wrap is called from only one place, it would be simpler to inline it at the point of use.

2. Revised code

import functools
import threading
class AtomicMeta(type):
 "Metaclass that serializes access to the methods of each instance."
 def __new__(mcs, name, bases, class_dict):
 for key, value in class_dict.items():
 if key not in ('__init__', '__new__') and callable(value):
 @functools.wraps(value)
 def atomic(self, *args, **kwargs):
 try:
 mutex = self._AtomicMeta__mutex
 except AttributeError:
 mutex = self._AtomicMeta__mutex = threading.RLock()
 with mutex:
 return value(self, *args, **kwargs)
 class_dict[key] = atomic
 if '__slots__' in class_dict:
 class_dict['__slots__'] += ['_AtomicMeta__mutex']
 return super().__new__(mcs, name, bases, class_dict)
answered Mar 14, 2018 at 13:53
\$\endgroup\$
2
\$\begingroup\$

The biggest problem I see with this whole thing, and the reason your code hasn't gotten much attention, is the lack of comments. In particular comments about why you're doing something a certain way, or what the purpose (goal) of a section of code is. This makes it cumbersome for someone looking over the code to give the kind of high-level analysis you're asking for. Putting some docstrings in (even though the interface is the same as a standard library) would also help out people like me who sometimes forget the difference between append and extend.

  • What's the reason for maintaining version in the deque itself? I don't undestand the Atomic stuff, I guess that's supposed to keep it threadsafe, but shouldn't that take care of things without needing version? You really need some comments here.
  • Is there any way to test if the atomicity actualy works?
  • Is an unexplained RuntimeError the best exception to raise if you detect a failure of atomicity using version?
  • The exceptions for __init__() refer to max_len but the named parameter is maxlen
  • Using a temporary tuple when duplicating the dequeue in extend() is quick to write but inefficient. Shouldn't it be possible to keep track using pointers?
  • In index(), I'm not sure if re-using start and stop that way is easy enough to understand. I don't think it hurts anything to create a separate variable, and name them in keeping with their actual content
  • Since the start <= index < stop test will almost always be quicker than the equality test, shouldn't it be first?
  • Does the reference implementation actually require a IndexError exception if you try to add to a full deque? That seems counterintuitive.
  • Isn't the last half of insert the same as append_left?
  • In the functions that come in left/right pairs, isn't there some way to combine them so the code is not duplicated?
  • I'm pretty sure there's a better implementation of rotate() that only requires patching four pointers.
  • Is the "exercise for the reader" comment supposed to be there?
answered Mar 14, 2018 at 13:43
\$\endgroup\$
1
\$\begingroup\$

I guess I'd add an alias for "the other side" everywhere a method name is decorated left or right, the way the library grew a bisect_right().
For reverse(), you could swap accessors (including, depending on implementation details, iterators) instead of references - better yet, create a class with swapped accessors and mutate the object on reverse().
What about __str__()?

answered Jan 13, 2018 at 11:32
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.