EDIT: The full source code in question can be found here.
Is this code reasonably safe against dead/livelocks, and if not, how can I fix it?
- I know processes are usually recommended over threads for operations like this, but processing's version of queue doesn't do what I need.
- The choose function will be called on 100 objects, but in sequence. I think each one should be finished before the next, but I'm not certain.
The appraise function that choose calls only gets values from dicts, and does math on them.
---thread_queue.py---
class ThreadsnQueues(threading.Thread): def __init__(self, queue, out_queue=None, func=None, args=None, semaphore=None): super(ThreadsnQueues, self).__init__() self.queue = queue self.out_queue = out_queue self.semaphore = semaphore if func is None: try: func = self.queue.get(block=False) except Empty: self.func = None return if func is not None: if type(func) is list: self.func = func[0] self.args = func[1:] else: self.func = func if args != [] and args != None: self.args = args class ThreadScore(ThreadsnQueues): def __init__(self, queue, out_queue, func, semaphore): super(ThreadScore, self).__init__(queue, out_queue, func, None, semaphore) def run(self): try: s = self.queue.get() #should be a stock object except Empty: print "Empty queue?!" return except: return while True: try: self.semaphore.acquire() ret = self.func(s) #should return a score (int) self.out_queue.put((s, ret)) #tuple with Stock, int self.queue.task_done() self.semaphore.release() return except KeyboardInterrupt: raise except ArithmeticError: a, b, c = exc_info() print a, b pdb.post_mortem(c) except StandardError: pass
---genome.py---
def choose(self, stocks): """ Scores the whole list of stocks, and puts the top twelve on a list to be purchased. """ tmp = {} queue = Queue.Queue() out_queue = Queue.Queue() procs = [] thread_limit = 10 self.prog = ProgressBar(widgets=[Percentage(), ' ', Bar(marker=RotatingMarker()), ' ', ETA()], maxval=len(stocks)).start() sym = threading.BoundedSemaphore(thread_limit) for stock in stocks: queue.put(stock) while queue.empty() != True: try: for j in range(thread_limit): t = ThreadScore(queue=queue, out_queue=out_queue, func=self.appraise, semaphore=sym) t.daemon = True t.start() procs.append(t) except ArithmeticError: raise except threrror: print "Encounted a thread error, blocking until threads complete..." [ p.join(5.0) for p in procs ] procs = [] except KeyboardInterrupt: raise except StandardError as e: print e [ p.join(5.0) for p in procs ] procs = [] [ p.join(timeout=5.0) for p in procs ] print "Finished scoring stocks, sorting..." self.prog.finish() while out_queue.qsize() > 0: l, r = out_queue.get() #unpack tuples of Stock, int tmp[l] = r #tmp[<Stock>] = score pmt = sorted(tmp, key=lambda x: tmp.get(x), reverse=True) self.prefers = { key: value for (key, value) in zip(pmt, sorted(tmp.values(), reverse=True))} self.scores = self.prefers.copy() return self.prefers
2 Answers 2
1. Comments on your code
We can't run this code. Where are the import statements for
Queue
andEmpty
?As ChrisWue points out, there's no such thing as "reasonably safe against deadlocks". Either you're safe or you're not.
There's no documentation for the
ThreadsnQueues
andThreadScore
classes. What do these classes do and how am I supposed to use them?What does the name
ThreadsnQueues
even mean?I can't see what use there can possibly be for the
ThreadsnQueues
class. You don't override therun
method, so this class is useless by itself.What kind of objects are supposed to be in the
queue
? InThreadsnQueues.__init__
you have:func = self.queue.get(block=False)
which suggests that the objects in the queue are functions, but in
ThreadScore.run
you have:s = self.queue.get() #should be a stock object
ThreadsnQueues.__init__
calls its superclass method with no arguments:super(ThreadsnQueues, self).__init__()
but in fact the
threading.Thread
class takes several keyword arguments (group
,target
,name
,args
,kwargs
,daemon
). By calling the superclass method with no arguments you make it impossible to use any of these features. This means that later on in the code you have to resort to writing:t.daemon = True
because your interface doesn't allow you to pass the keyword argument
daemon=True
to your constructor.The proper way to handle this is for the subclass method to take arbitrary keyword arguments and pass them to the superclass method. Like this:
def __init__(self, queue, out_queue=None, func=None, args=None, semaphore=None, **kwargs): super(ThreadsnQueues, self).__init__(**kwargs)
It's rarely correct in Python to insist on types matching exactly, like you do here:
if type(func) is list:
What you care about here is that
func
supports the sequence interface, and the way to test for that is:if isinstance(func, collections.abc.Sequence):
In
ThreadsnQueues.__init__
you initialize a memberself.args
, but this is never used.In
ThreadsnQueues.__init__
,semaphore
defaults toNone
, but inThreadScore.run
you just callself.semaphore.acquire()
which will raise
AttributeError
ifself.semaphore
isNone
. If a semaphore is required, you should detect its absence in the constructor and raise an exception. (Or make thesemaphore
argument a required positional argument instead of a keyword argument.)Your use of
StandardError
limits your code to Python 2.Ignoring generic classes of exceptions like this:
except StandardError: pass
is almost always a bad idea. When you get an unexpected exception, you need to be informed about it so that you can fix the problem that caused it. If you have to suppress a particular exception, do so locally around the code that might generate it (and explain why you are doing so).
This all seems way too complex to me. It looks as though you have a collection of stock objects, and you want to call
self.appraise
on each stock object (using a fixed-size pool of worker threads), wait for them all to complete, and then sort the results in reverse order by score.In Python 3 you'd accomplish this using the built-in
concurrent.futures.ThreadPoolExecutor
, like this:sorted(ThreadPoolExecutor(max_workers=thread_limit) .map(lambda s:(self.appraise(s), s), stocks, timeout=5.0), reverse=True)
In Python 2 you can use the (poorly documented)
multiprocessing.pool.ThreadPool
.Why doesn't this work for you? You write, "I know processes are usually recommended over threads for operations like this, but processing's version of queue doesn't do what I need." But what exactly do you need? Perhaps if you explained the problem then we could see if your reasoning makes any sense.
Do you actually get any benefit from using threads here? If your
appraise
method spends most of its time running Python code, then it seems unlikely that you will get any benefit, because all the worker threads will queue up waiting for the global interpreter lock.(On the other hand, if
appraise
spends most of its time waiting for file or network I/O, then your approach makes sense.)
2. Responses to comments
ThreadsnQueues
is an ABC and parent to bothThreadScore
, and another class calledThreadTrue
It's no good explaining this to me now! Your code needs to make this clear to its readers. Think of the future when someone may need to maintain this code, and you might not be around to answer questions.
The name was just meant to say it worked with queues.
It did not convey that information to me. (Also, "work with queues" how?)
Isn't it sufficient to just keep testing my code, (e.g., by adding code to call
sys.exc_info()
and using that to runpdb
's post mortem analysis, or simply capturingStandardException as e
and printing it) and ignore exceptions for actual releases? It seems like if I run into an exception I've never seen before, it couldn't be handled by some premeditated heuristic.
My point is, what are you going to do if you get an unexpected exception in an actual release? At the moment you just suppress the exception and keep running. But that could be disastrous: maybe the exception is telling you something important that needs to be fixed, such as the computer running out of disk space, or some Python library getting corrupted, or who knows? By suppressing exceptions you prevent anyone being informed of problems in a timely manner.
The GIL probably is getting in the way. I began using threads to handle network and file I/O. This is likely out of their domain.
I had a quick look at the source for the appraise
method and it executes database queries over a network connection, so it will spend some (maybe most) of its time waiting for the responses from the database, and so the GIL might not be a bottleneck.
P.S. Although it's out of scope for this code review, I couldn't help noticing this line:
vals = {key: conn.execute(select([a[1] for a in s.this.c.items() if a[0] != 'key']).where(s.this.c.key == key)).fetchall() for key in self.keys }
This approach is a bad idea: it executes a database query for every key. It would be much more efficient to execute one query that fetches the results for all the keys at once.
-
-
\$\begingroup\$ To go through point-by-point, starting with #5, ThreadsnQueues is an ABC and parent to both ThreadScore, and another class called ThreadTrue. The name was just meant to say it worked with queues. As for #6, ThreadTrue does retrieve functions from its queue. That only happens when no function is supplied to the constructor, so it will have a similar issue to #10. #7: calling
t.daemon = True
works for me, but I may add the**kwargs
later. #8: good to know, I'll make that change. #10: duly noted. #11: If there is an equivalent subset of errors to StandardError in Python 3, I will use it. \$\endgroup\$Chara– Chara2013年10月21日 14:44:42 +00:00Commented Oct 21, 2013 at 14:44 -
\$\begingroup\$ #12: Isn't it sufficient to just keep testing my code, (e.g., by adding code to call
sys.exc_info()
and using that to run pdb's post mortem analysis, or simply capturingStandardException as e
and printing it) and ignore exceptions for actual releases? It seems like if I run into an exception I've never seen before, it couldn't be handled by some premeditated heuristic. #13: I was not aware ofmultiprocessing.pool.ThreadPool
, but I will check it out now. #14: The GIL probably is getting in the way. I began using threads to handle network and file I/O. This is likely out of their domain. \$\endgroup\$Chara– Chara2013年10月21日 14:50:07 +00:00Commented Oct 21, 2013 at 14:50 -
\$\begingroup\$ @user2845306: see revised post for my responses. \$\endgroup\$Gareth Rees– Gareth Rees2013年10月22日 11:30:57 +00:00Commented Oct 22, 2013 at 11:30
-
\$\begingroup\$ Thanks for answering my extra questions. I've noted the changes as issues on Google Code, and I'm going to start work on them ASAP. \$\endgroup\$Chara– Chara2013年10月22日 16:49:27 +00:00Commented Oct 22, 2013 at 16:49
- "reasonably safe against deadlocks" is a strange expression :)
(削除) InThreadsnQueues
theif func is not None:
can be turned into anelse
. (削除ここまで)- Your code in
ThreadScore
indicates that some exceptions might be expected because you handle them and continue. The problem is if that happens while you have the semaphore acquired then it will not release it. You should move the release intofinally
block. - In
choose
you join on all threads in multiple places and this code[ p.join(5.0) for p in procs ]
gets repeated several times. Should be refactored into a method. I'm not sure what the point of the semaphore is. The way I read you code is:
- You initialize the semaphore with N
- Then you spawn N threads each calling acquire/release in it's run method.
This means that all threads can simultaneously enter the block protected by the semaphore. So what's the point of having it?
- In
ThreadScore.run()
the main part is thewhile True
loop. However it never seems to take anything of the queue (it only seems to do it once at the beginning of therun
method) so how does it work?
-
1\$\begingroup\$ Your suggestion in point 2 would change the meaning of the code, since
func
may be updated inside theif func is None:
. \$\endgroup\$Gareth Rees– Gareth Rees2013年10月19日 16:30:25 +00:00Commented Oct 19, 2013 at 16:30 -
\$\begingroup\$ @ChrisWue I used to have the semaphore released in the finally block, but sometimes this would result in semaphores being released more times than they were called (probably from queue.get failing.) My plan with the semaphore was to limit the number of concurrent threads, but there must be a better way of doing it. \$\endgroup\$Chara– Chara2013年10月20日 11:14:56 +00:00Commented Oct 20, 2013 at 11:14
-
\$\begingroup\$ The number of concurrent threads is already limited by the number of threads you spin up. As I said, right now it doesn't really do anything. And the release call should be put in the finally block for the
try
inside thewhile True
loop not in the outer one. \$\endgroup\$ChrisWue– ChrisWue2013年10月20日 19:01:26 +00:00Commented Oct 20, 2013 at 19:01 -
\$\begingroup\$ So, if I remove the semaphores, would I still need a finally block for anything? Also, while I get the use of ordinary locks, I don't quite get when semaphores should really be used. Why would you need multiple threads to access a resource, but still need to limit just how many can? It seems like you should only be able to use many threads, or not. \$\endgroup\$Chara– Chara2013年10月21日 14:26:38 +00:00Commented Oct 21, 2013 at 14:26
-
\$\begingroup\$ Also, point number 6: it's been a while since I wrote that code, but I think I was having about one thread per queue item. So the thread would continue trying to finish its task if it failed (the
except StandardError: pass
) and then I guess thewhile queue.empty() == True
loop would run again. In retrospect, that's probably pretty inefficient. \$\endgroup\$Chara– Chara2013年10月21日 14:34:20 +00:00Commented Oct 21, 2013 at 14:34
Explore related questions
See similar questions with these tags.