2

I am facing some issues while implementing multi-threading in python. The issue is very specific to my use case. Having gone through numerous posts on the same, I deployed the most widely suggested/used method for doing so.

I start by defining my thread class as follows.

class myThread(Thread):
 def __init__(self, graphobj, q):
 Thread.__init__(self)
 self.graphobj = graphobj
 self.q = q
 def run(self):
 improcess(self.graphobj, self.q)

Post which I define my function that does all the processing required.

def improcess(graphobj, q):
 while not exitFlag:
 queueLock.acquire()
 if not q.empty():
 photo_id = q.get()
 queueLock.release()
 # Complete processing
 else:
 queueLock.release()

Now comes the part where I am stuck. I am able to run the below mentioned code exactly as it is without any issues. However if I try and wrap the same in a function as such it breaks down.

def train_control(graphobj, photo_ids):
 workQueue = Queue(len(photo_ids))
 for i in range(1,5):
 thread = myThread(graphobj=graphobj, q=workQueue)
 thread.start()
 threads.append(thread)
 queueLock.acquire()
 for photo_id in photo_ids:
 workQueue.put(photo_id)
 queueLock.release()
 while not workQueue.empty():
 pass
 exitFlag = 1
 for t in threads:
 t.join()

By breaking down I mean that the threads complete their work but they don't stop waiting i.e. the exitFlag is never set to 1. I am unsure as to how to make this work.

Unfortunately the design of our systems is such that this piece of codes needs to be wrapped in a function which can be invoked by another module, so pulling it out is not really an option.

Looking forward to hearing from experts on this. Thanks in advance.

Edit : Forgot to mention this in the first draft. I globally initialize exitFlag and set its value to 0.

Below is the minimum, verifiable code snippet that I created to capture this problem:

import threading
import Queue
globvar01 = 5
globvar02 = 7
exitFlag = 0
globlist = []
threads = []
queueLock = threading.Lock()
workQueue = Queue.Queue(16)
class myThread(threading.Thread):
 def __init__(self, threadID, q):
 threading.Thread.__init__(self)
 self.threadID = threadID
 self.q = q
 def run(self):
 print "Starting thread " + str(self.threadID)
 myfunc(self.threadID, self.q)
 print "Exiting thread " + str(self.threadID)
def myfunc(threadID, q):
 while not exitFlag:
 queueLock.acquire()
 if not workQueue.empty():
 thoughtnum = q.get()
 queueLock.release()
 print "Processing thread " + str(threadID)
 if (thoughtnum < globvar01):
 globlist.append([1,2,3])
 elif (thoughtnum < globvar02):
 globlist.append([2,3,4])
 else:
 queueLock.release()
def controlfunc():
 for i in range(1,5):
 thread = myThread(i, workQueue)
 thread.start()
 threads.append(thread)
 queueLock.acquire()
 for i in range(1,11):
 workQueue.put(i)
 queueLock.release()
 # Wait for queue to empty
 while not workQueue.empty():
 pass
 exitFlag = 1
 # Wait for all threads to complete
 for t in threads:
 t.join()
print "Starting main thread"
controlfunc()
print "Exiting Main Thread"
asked Nov 2, 2017 at 18:53
3
  • How about clearing exitFlag = 0 in train_control at the top before starting any threads Commented Nov 2, 2017 at 19:05
  • Instead of "forgetting to mention..." make a minimal reproducible example. Strip your code down to the basics while still being a standalone program that reproduces the problem described, and then post exactly that script along with current and desired output. Then we will be able to run the code without guessing at the missing parts. Commented Nov 2, 2017 at 23:08
  • @MarkTolonen : Sure. I just edited the question to post as suggested. Thanks ! Commented Nov 3, 2017 at 10:51

2 Answers 2

1

From your MCVE, the only thing missing is:

while not workQueue.empty():
 pass
global exitFlag # Need this or `exitFlag` is a local variable only.
exitFlag = 1

You could eliminate the queueLock and the exitFlag, however, by using a sentinel value in the Queue to shut down the worker threads, and it eliminates the spin-waiting. Worker threads will sleep on a q.get() and the main thread won't have to spin-wait for an empty queue:

#!python2
from __future__ import print_function
import threading
import Queue
debug = 1
console = threading.Lock()
def tprint(*args,**kwargs):
 if debug:
 name = threading.current_thread().getName()
 with console:
 print('{}: '.format(name),end='')
 print(*args,**kwargs)
globvar01 = 5
globvar02 = 7
globlist = []
threads = []
workQueue = Queue.Queue(16)
class myThread(threading.Thread):
 def __init__(self, threadID, q):
 threading.Thread.__init__(self)
 self.threadID = threadID
 self.q = q
 def run(self):
 tprint("Starting thread " + str(self.threadID))
 myfunc(self.threadID, self.q)
 tprint("Exiting thread " + str(self.threadID))
def myfunc(threadID, q):
 while True:
 thoughtnum = q.get()
 tprint("Processing thread " + str(threadID))
 if thoughtnum is None:
 break
 elif thoughtnum < globvar01:
 globlist.append([1,2,3])
 elif thoughtnum < globvar02:
 globlist.append([2,3,4])
def controlfunc():
 for i in range(1,5):
 thread = myThread(i, workQueue)
 thread.start()
 threads.append(thread)
 for i in range(1,11):
 workQueue.put(i)
 # Wait for all threads to complete
 for t in threads:
 workQueue.put(None)
 for t in threads:
 t.join()
tprint("Starting main thread")
controlfunc()
tprint("Exiting Main Thread")

Output:

MainThread: Starting main thread
Thread-1: Starting thread 1
Thread-2: Starting thread 2
Thread-3: Starting thread 3
Thread-4: Starting thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Exiting thread 3
Thread-4: Exiting thread 4
Thread-1: Exiting thread 1
Thread-2: Exiting thread 2
MainThread: Exiting Main Thread
answered Nov 3, 2017 at 16:14
Sign up to request clarification or add additional context in comments.

Comments

0

You need to make sure exitFlag is set to 0 (False) before spawning any threads otherwise in impprocess() they won't do anything and the queue will remain non-empty.

This problem could happen if you have exitFlag as a global and it's not cleared from a previous run.

answered Nov 2, 2017 at 19:08

2 Comments

Apologies, as I had forgotten to mention that I have initialized exitFlag globally to 0. In addition to which I only call the function once removing the need to set it again. Nonetheless I tried clearing the flag at the begging of the function without any returns.
I'd put debug logging in to see what's going on. After each time the queue lock is acquired or released, and each time the queue has a put or get.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.