3
\$\begingroup\$

Python 3 has queue.PriorityQueue, but that wasn't good enough (lacking methods, not double-ended, etc) for me and my purposes. Therefore, I have made a new one based on a dictionary of collection.deques.

I am well aware that this is a naive implementation, because it uses 5 times as many collections.deques as a smart Python implementation would. This is mostly because I couldn't figure out how to simply implement efficient search (for finding ranges of one priority in the deque) without looking at almost every element.

This is supposed to be thread-safe (as far as that can go in CPython) but it also doesn't seem to break when shared between PyPy stackless tasklets.

There are a finite number of priorities a thing can have. This is what is meant by "absolute priorties". The priority of an entry is meaningful without considering the priorities of other objects in the queue.

Compare this with the Linux kernel, whose processes are given a number from -19 (most nice) to 20 (least nice) which determines how they will be scheduled with respect to one another. This is effective because, like any task scheduler, the kernel will probably come back to run every process at some point.

However, the priority_deque is designed not for "re-entrant access" to each entry like a long-running process scheduler needs, but rather one-time access to the ends of the deque(s).

And so with absolute priorities the five-section priority_deque acts somewhat like a contiguous datastructure, in that its most privileged entries are preferred (by default) over the least privileged ones.

Note finally that the highest priority deques are of very finite size. Normally, this would result in the oldest (and arguably most important) entries being shoved off the other end of the deque to make room for the new entries (as the maxlen argument normally does).

This implementation uses deque.maxlen to prevent losing important entries and instead scheduling new entries at the top end of a longer, less important deque.

GitHub

prioritydeque.py

import enum
@enum.unique
class priority(enum.IntEnum):
 undef, low, normal, high, airmail = range(-1, 4)
 def describe(self):
 return self.value, self.name
 @classmethod
 def min(cls):
 return min(
 [e.describe() for e in list(cls)],
 key=lambda x: x[0]
 )
 @classmethod
 def max(cls):
 return max(
 [e.describe() for e in list(cls)],
 key=lambda x: x[0]
 )
DEFAULT_MAXLENS = {
 priority.undef: None,
 priority.low: None,
 priority.normal: None,
 priority.high: 50,
 priority.airmail: 10
}
def get_maxlen(params, key):
 return params.get(key, DEFAULT_MAXLENS[key])
class priority_deque():
 """
 Base class for priority deque objects.
 """
 @staticmethod
 def default_nice_sorter(nices):
 return sorted(nices, key=lambda x: x.value, reverse=True)
 @staticmethod
 def random_nice_sorter(nices):
 import random
 return random.shuffle(nices, len(nices))
 def __init__(self, *args, **kwargs):
 """
 params: priority_enum ** (an alternative enum class with the same
 member names as the priority class)
 retval: a blank priority_deque
 raises: AttributeError if priority_enum has an unexpected set
 of names
 purity: yes
 Create an empty priority deque.
 """
 import threading
 from collections import deque
 self.prty = priority
 if "priority_enum" in kwargs:
 self.prty = kwargs["priority_enum"]
 self._pool = {
 self.prty.undef:
 deque(maxlen=get_maxlen(kwargs, self.prty.undef)),
 self.prty.low:
 deque(maxlen=get_maxlen(kwargs, self.prty.low)),
 self.prty.normal:
 deque(maxlen=get_maxlen(kwargs, self.prty.normal)),
 self.prty.high:
 deque(maxlen=get_maxlen(kwargs, self.prty.high)),
 self.prty.airmail:
 deque(maxlen=get_maxlen(kwargs, self.prty.airmail))
 }
 self.lock = threading.Lock()
 def push(
 self, obj, want_nice=None, force=False,
 want_push_func=lambda q, o: q.appendleft(o),
 settle_push_func=lambda q, o: q.append(o)
 ):
 """
 params: obj (an object)
 want_nice ** (a priority; default: self.prty.normal)
 force ** (a bool; default: false)
 want_push_func ** (a function q, o -> None;
 default: appendleft)
 settle_push_func ** (a function q, o -> None;
 default: append)
 retval: None (a NoneType)
 nice (a priority; the priority that obj ended up with)
 raises: KeyError if nice is not a key in self.prty (that is, it
 is not a key in self._pool)
 purity: relative
 Add a new entry to the pool, with the maximum priority of nice.
 The entry may end up with a lower priority because all the other
 deques were full.
 obj can be pushed to the top (right side) of a deque by specifying
 push_func like (lambda q, o: q.append(o)).
 If the preferred nice value want_nice is full and force=False,
 settle_push_func will be used to "settle for" a lower nice
 value.
 By default, this secondary function pushes to the top of the next
 lowest priority.
 If force=False, this method is not destructive; it will try to
 push on a deque in the pool which is not full.
 To force pushing an object into a specific priority even if they
 are full, set force=True.
 """
 import time
 if want_nice is None:
 want_nice = self.prty.normal
 if force or self._can_push(want_nice):
 time.sleep(0)
 with self.lock:
 return want_push_func(self._pool[want_nice], obj), want_nice
 # start from the highest priority and go down
 nices = range(want_nice, priority.min()[0])
 for nice in nices:
 # nice != want_nice
 time.sleep(0)
 if self._can_push(nice):
 with self.lock:
 return settle_push_func(self._pool[nice], obj), nice
 def pop(
 self, force_nice=(False, None),
 nice_sorter=None, pop_func=lambda q: q.pop()
 ):
 """
 params: force_nice ** (a pair<bool, priority>;
 default: (Force, None))
 nice_sorter ** (a function n -> s;
 default: priority_deque.default_nice_sorter)
 pop_func ** (a function q -> o; default: pop)
 retval: obj (an object)
 nice (a priority; the priority obj had)
 raises: KeyError if force_nice isn't long enough
 KeyError if force_nice[1] is not a key in self.prty
 purity: relative
 Remove an entry from the pool.
 By default, looks for the highest-priority items first.
 The priority of the resulting object is returned alongside it.
 If no object was found, an object of None and a priority of None
 are returned.
 The deques are sorted by nice_sorter, and the highest-priority non-
 empty deque is popped from with pop_func.
 To look for lower priorities first, use a function which does not
 reverse-sort the priority list.
 To use a random priority, use self.random_nice_sorter
 To pop from a specific priority, use force_nice=(True, nice).
 This will return an object or None (if the priority was empty) and
 the provided priority.
 """
 import time
 if nice_sorter is None:
 nice_sorter = self.default_nice_sorter
 if force_nice[0]:
 time.sleep(0)
 with self.lock:
 return pop_func(self._pool[ force_nice[1] ]), force_nice[1]
 nices = self._sort_pool(nice_sorter)
 for nice in nices:
 time.sleep(0)
 dq = self._pool[nice]
 if len(dq):
 with self.lock:
 return pop_func(dq), nice
 return None, None
 def peek(
 self, force_nice=(False, None),
 nice_sorter=None, peek_func=lambda q: q[-1]
 ):
 """
 params: force_nice ** (a pair<bool, priority>;
 default: (Force, None))
 nice_sorter ** (a function n -> s;
 default: priority_deque.default_nice_sorter)
 pop_func ** (a function q -> o;
 default: lambda q: q[-1])
 retval: obj (an object)
 nice (a priority; the priority obj has)
 raises: KeyError if force_nice isn't long enough
 KeyError if force_nice[1] is not a key in self.prty
 purity: relative
 View an entry in the pool.
 """
 if nice_sorter is None:
 nice_sorter = self.default_nice_sorter
 if force_nice[0]:
 with self.lock:
 return peek_func(self._pool[ force_nice[1] ]), force_nice[1]
 return self.pop(nice_sorter=nice_sorter, pop_func=peek_func)
 def clear1(self, nice):
 dq = self._pool[nice].copy()
 with self.lock:
 self._pool[nice].clear()
 return dq
 def clear(self):
 pool = self._pool.copy()
 for nice in self.prty:
 with self.lock:
 self._pool[nice].clear()
 return pool
 def _sort_pool(self, nice_sorter=default_nice_sorter):
 return nice_sorter( self.prty )
 def _can_push(self, nice):
 if self._pool[nice].maxlen is None:
 return True
 return len( self._pool[nice] ) < self._pool[nice].maxlen
 def __repr__(self):
 return repr(self._pool)
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Jan 20, 2018 at 20:44
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Just reviewing the docstring for the push method:

 params: obj (an object)
 want_nice ** (a priority; default: self.prty.normal)
 force ** (a bool; default: false)
 want_push_func ** (a function q, o -> None;
 default: appendleft)
 settle_push_func ** (a function q, o -> None;
 default: append)
 retval: None (a NoneType)
 nice (a priority; the priority that obj ended up with)
 raises: KeyError if nice is not a key in self.prty (that is, it
 is not a key in self._pool)
 purity: relative
 Add a new entry to the pool, with the maximum priority of nice.
 The entry may end up with a lower priority because all the other
 deques were full.
 obj can be pushed to the top (right side) of a deque by specifying
 push_func like (lambda q, o: q.append(o)).
 If the preferred nice value want_nice is full and force=False,
 settle_push_func will be used to "settle for" a lower nice
 value.
 By default, this secondary function pushes to the top of the next
 lowest priority.
 If force=False, this method is not destructive; it will try to
 push on a deque in the pool which is not full.
 To force pushing an object into a specific priority even if they
 are full, set force=True.

When you find yourself writing a docstring like this, you ought to hear a little voice saying, "hang on a second, this seems awfully complicated!" Complexity in interfaces is costly — it requires time for programmers to understand the interface, it adds maintenance burden, and it increases the number of bugs.

Those are my general concerns, but I have some specific concerns too:

  1. The function returns a tuple whose first element is None and whose second element is the priority. It would surely be more convenient for the caller if you just returned the priority.
  2. The name want_nice is the preferred priority for the object, so it would be clearer if this name included the word priority somewhere.
  3. If force=True, then it looks as though objects will be silently discarded when a priority queue is full. This seems likely to be the cause of hard-to-find bugs in applications using the class. Is it really a good idea to provide this flag?
  4. Even when force=False, it looks as though the object might silently fail to be pushed when all priority queues are full. Again, this seems likely to be the cause of bugs. Failures need to be reported to the caller, for example by raising an exception.
  5. There is no documentation for the parameter want_push_func. What does this do? Presumably the mention of push_func is a typo for want_push_func.
  6. Presumably the intention is that there are just two valid values for want_push_func, namely lambda q, o: q.appendleft(o) and lambda q, o: q.append(o). But if this is the case, it should be documented and not just left for the reader to guess.
  7. The want_push_func interface seems inconvenient: surely it would be better for the priority_deque class to have two methods for these two cases? Or if you must have one method with two behaviours, take a flag or an enum instead of a function?
  8. The same comments apply to settle_push_func: this also needs to be one of two functions, so it really ought to be a flag or enum.
  9. settle_push_func looks like a bug magnet to me. It is only called in rare circumstances (when a priority queue is full) and so if you made an mistake in writing it then you might not discover the mistake in testing, because it might not be called by your test cases.
  10. I can't help but think that in a priority queue system, it shouldn't be necessary to have special mechanisms for pushing on to the front of the queue — if the caller has a higher-priority item, they ought to be able to specify a higher priority. It looks to me as though the design, with a fixed set of priorities and a fixed number of objects at each priority, has created artificial limitations that you then have to try to work around, which leads to complexity in the interface and implementation. When you find yourself in this situation, it's worth taking a step back and trying to see if your design is really meeting your requirements.
mkrieger1
1,7841 gold badge14 silver badges26 bronze badges
answered Jan 24, 2018 at 10:40
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.