Python 3 has queue.PriorityQueue
, but that wasn't good enough (lacking methods, not double-ended, etc) for me and my purposes. Therefore, I have made a new one based on a dictionary of collection.deque
s.
I am well aware that this is a naive implementation, because it uses 5 times as many collections.deque
s as a smart Python implementation would. This is mostly because I couldn't figure out how to simply implement efficient search (for finding ranges of one priority in the deque) without looking at almost every element.
This is supposed to be thread-safe (as far as that can go in CPython) but it also doesn't seem to break when shared between PyPy stackless
tasklets.
There are a finite number of priorities a thing can have. This is what is meant by "absolute priorties". The priority of an entry is meaningful without considering the priorities of other objects in the queue.
Compare this with the Linux kernel, whose processes are given a number from -19 (most nice) to 20 (least nice) which determines how they will be scheduled with respect to one another. This is effective because, like any task scheduler, the kernel will probably come back to run every process at some point.
However, the priority_deque
is designed not for "re-entrant access" to each entry like a long-running process scheduler needs, but rather one-time access to the ends of the deque(s).
And so with absolute priorities the five-section priority_deque
acts somewhat like a contiguous datastructure, in that its most privileged entries are preferred (by default) over the least privileged ones.
Note finally that the highest priority deque
s are of very finite size. Normally, this would result in the oldest (and arguably most important) entries being shoved off the other end of the deque
to make room for the new entries (as the maxlen
argument normally does).
This implementation uses deque.maxlen
to prevent losing important entries and instead scheduling new entries at the top end of a longer, less important deque
.
prioritydeque.py
import enum
@enum.unique
class priority(enum.IntEnum):
undef, low, normal, high, airmail = range(-1, 4)
def describe(self):
return self.value, self.name
@classmethod
def min(cls):
return min(
[e.describe() for e in list(cls)],
key=lambda x: x[0]
)
@classmethod
def max(cls):
return max(
[e.describe() for e in list(cls)],
key=lambda x: x[0]
)
DEFAULT_MAXLENS = {
priority.undef: None,
priority.low: None,
priority.normal: None,
priority.high: 50,
priority.airmail: 10
}
def get_maxlen(params, key):
return params.get(key, DEFAULT_MAXLENS[key])
class priority_deque():
"""
Base class for priority deque objects.
"""
@staticmethod
def default_nice_sorter(nices):
return sorted(nices, key=lambda x: x.value, reverse=True)
@staticmethod
def random_nice_sorter(nices):
import random
return random.shuffle(nices, len(nices))
def __init__(self, *args, **kwargs):
"""
params: priority_enum ** (an alternative enum class with the same
member names as the priority class)
retval: a blank priority_deque
raises: AttributeError if priority_enum has an unexpected set
of names
purity: yes
Create an empty priority deque.
"""
import threading
from collections import deque
self.prty = priority
if "priority_enum" in kwargs:
self.prty = kwargs["priority_enum"]
self._pool = {
self.prty.undef:
deque(maxlen=get_maxlen(kwargs, self.prty.undef)),
self.prty.low:
deque(maxlen=get_maxlen(kwargs, self.prty.low)),
self.prty.normal:
deque(maxlen=get_maxlen(kwargs, self.prty.normal)),
self.prty.high:
deque(maxlen=get_maxlen(kwargs, self.prty.high)),
self.prty.airmail:
deque(maxlen=get_maxlen(kwargs, self.prty.airmail))
}
self.lock = threading.Lock()
def push(
self, obj, want_nice=None, force=False,
want_push_func=lambda q, o: q.appendleft(o),
settle_push_func=lambda q, o: q.append(o)
):
"""
params: obj (an object)
want_nice ** (a priority; default: self.prty.normal)
force ** (a bool; default: false)
want_push_func ** (a function q, o -> None;
default: appendleft)
settle_push_func ** (a function q, o -> None;
default: append)
retval: None (a NoneType)
nice (a priority; the priority that obj ended up with)
raises: KeyError if nice is not a key in self.prty (that is, it
is not a key in self._pool)
purity: relative
Add a new entry to the pool, with the maximum priority of nice.
The entry may end up with a lower priority because all the other
deques were full.
obj can be pushed to the top (right side) of a deque by specifying
push_func like (lambda q, o: q.append(o)).
If the preferred nice value want_nice is full and force=False,
settle_push_func will be used to "settle for" a lower nice
value.
By default, this secondary function pushes to the top of the next
lowest priority.
If force=False, this method is not destructive; it will try to
push on a deque in the pool which is not full.
To force pushing an object into a specific priority even if they
are full, set force=True.
"""
import time
if want_nice is None:
want_nice = self.prty.normal
if force or self._can_push(want_nice):
time.sleep(0)
with self.lock:
return want_push_func(self._pool[want_nice], obj), want_nice
# start from the highest priority and go down
nices = range(want_nice, priority.min()[0])
for nice in nices:
# nice != want_nice
time.sleep(0)
if self._can_push(nice):
with self.lock:
return settle_push_func(self._pool[nice], obj), nice
def pop(
self, force_nice=(False, None),
nice_sorter=None, pop_func=lambda q: q.pop()
):
"""
params: force_nice ** (a pair<bool, priority>;
default: (Force, None))
nice_sorter ** (a function n -> s;
default: priority_deque.default_nice_sorter)
pop_func ** (a function q -> o; default: pop)
retval: obj (an object)
nice (a priority; the priority obj had)
raises: KeyError if force_nice isn't long enough
KeyError if force_nice[1] is not a key in self.prty
purity: relative
Remove an entry from the pool.
By default, looks for the highest-priority items first.
The priority of the resulting object is returned alongside it.
If no object was found, an object of None and a priority of None
are returned.
The deques are sorted by nice_sorter, and the highest-priority non-
empty deque is popped from with pop_func.
To look for lower priorities first, use a function which does not
reverse-sort the priority list.
To use a random priority, use self.random_nice_sorter
To pop from a specific priority, use force_nice=(True, nice).
This will return an object or None (if the priority was empty) and
the provided priority.
"""
import time
if nice_sorter is None:
nice_sorter = self.default_nice_sorter
if force_nice[0]:
time.sleep(0)
with self.lock:
return pop_func(self._pool[ force_nice[1] ]), force_nice[1]
nices = self._sort_pool(nice_sorter)
for nice in nices:
time.sleep(0)
dq = self._pool[nice]
if len(dq):
with self.lock:
return pop_func(dq), nice
return None, None
def peek(
self, force_nice=(False, None),
nice_sorter=None, peek_func=lambda q: q[-1]
):
"""
params: force_nice ** (a pair<bool, priority>;
default: (Force, None))
nice_sorter ** (a function n -> s;
default: priority_deque.default_nice_sorter)
pop_func ** (a function q -> o;
default: lambda q: q[-1])
retval: obj (an object)
nice (a priority; the priority obj has)
raises: KeyError if force_nice isn't long enough
KeyError if force_nice[1] is not a key in self.prty
purity: relative
View an entry in the pool.
"""
if nice_sorter is None:
nice_sorter = self.default_nice_sorter
if force_nice[0]:
with self.lock:
return peek_func(self._pool[ force_nice[1] ]), force_nice[1]
return self.pop(nice_sorter=nice_sorter, pop_func=peek_func)
def clear1(self, nice):
dq = self._pool[nice].copy()
with self.lock:
self._pool[nice].clear()
return dq
def clear(self):
pool = self._pool.copy()
for nice in self.prty:
with self.lock:
self._pool[nice].clear()
return pool
def _sort_pool(self, nice_sorter=default_nice_sorter):
return nice_sorter( self.prty )
def _can_push(self, nice):
if self._pool[nice].maxlen is None:
return True
return len( self._pool[nice] ) < self._pool[nice].maxlen
def __repr__(self):
return repr(self._pool)
1 Answer 1
Just reviewing the docstring for the push
method:
params: obj (an object)
want_nice ** (a priority; default: self.prty.normal)
force ** (a bool; default: false)
want_push_func ** (a function q, o -> None;
default: appendleft)
settle_push_func ** (a function q, o -> None;
default: append)
retval: None (a NoneType)
nice (a priority; the priority that obj ended up with)
raises: KeyError if nice is not a key in self.prty (that is, it
is not a key in self._pool)
purity: relative
Add a new entry to the pool, with the maximum priority of nice.
The entry may end up with a lower priority because all the other
deques were full.
obj can be pushed to the top (right side) of a deque by specifying
push_func like (lambda q, o: q.append(o)).
If the preferred nice value want_nice is full and force=False,
settle_push_func will be used to "settle for" a lower nice
value.
By default, this secondary function pushes to the top of the next
lowest priority.
If force=False, this method is not destructive; it will try to
push on a deque in the pool which is not full.
To force pushing an object into a specific priority even if they
are full, set force=True.
When you find yourself writing a docstring like this, you ought to hear a little voice saying, "hang on a second, this seems awfully complicated!" Complexity in interfaces is costly — it requires time for programmers to understand the interface, it adds maintenance burden, and it increases the number of bugs.
Those are my general concerns, but I have some specific concerns too:
- The function returns a tuple whose first element is
None
and whose second element is the priority. It would surely be more convenient for the caller if you just returned the priority. - The name
want_nice
is the preferred priority for the object, so it would be clearer if this name included the wordpriority
somewhere. - If
force=True
, then it looks as though objects will be silently discarded when a priority queue is full. This seems likely to be the cause of hard-to-find bugs in applications using the class. Is it really a good idea to provide this flag? - Even when
force=False
, it looks as though the object might silently fail to be pushed when all priority queues are full. Again, this seems likely to be the cause of bugs. Failures need to be reported to the caller, for example by raising an exception. - There is no documentation for the parameter
want_push_func
. What does this do? Presumably the mention ofpush_func
is a typo forwant_push_func
. - Presumably the intention is that there are just two valid values for
want_push_func
, namelylambda q, o: q.appendleft(o)
andlambda q, o: q.append(o)
. But if this is the case, it should be documented and not just left for the reader to guess. - The
want_push_func
interface seems inconvenient: surely it would be better for thepriority_deque
class to have two methods for these two cases? Or if you must have one method with two behaviours, take a flag or an enum instead of a function? - The same comments apply to
settle_push_func
: this also needs to be one of two functions, so it really ought to be a flag or enum. settle_push_func
looks like a bug magnet to me. It is only called in rare circumstances (when a priority queue is full) and so if you made an mistake in writing it then you might not discover the mistake in testing, because it might not be called by your test cases.- I can't help but think that in a priority queue system, it shouldn't be necessary to have special mechanisms for pushing on to the front of the queue — if the caller has a higher-priority item, they ought to be able to specify a higher priority. It looks to me as though the design, with a fixed set of priorities and a fixed number of objects at each priority, has created artificial limitations that you then have to try to work around, which leads to complexity in the interface and implementation. When you find yourself in this situation, it's worth taking a step back and trying to see if your design is really meeting your requirements.
Explore related questions
See similar questions with these tags.