I'm trying to implement a producer-consumer scenario using multiprocessing and a queue; the main process is the producer and two subprocesses consuming data from the queue. This works while nothing unusual happens, but the twist is that I want to be able to restart workers in case they die (kill -9 workerpid). However, when I kill one or both workers, they start saying "queue was empty" even if the main process keeps stuffing data in the queue.
What am I missing here? (using Python 2.7.3 on Ubuntu 12.04)
import sys
import time
import multiprocessing
from Queue import Empty
workers = []
fqueue = multiprocessing.Queue()
class Worker(multiprocessing.Process):
def run(self):
queue = self._args[0]
print "{0} starting up, queue at: {1}".format(self.pid, queue)
while True:
try:
obj = queue.get(block=True, timeout=1)
print "{0}: got from queue: {1}".format(self.pid, obj)
except Empty:
print "{0}: queue was empty".format(self.pid)
continue
except IOError, e:
print "{0}: got IOError on queue: {1}".format(self.pid, e)
return
if __name__ == "__main__":
print "zipper starting up (2 workers)"
for _ in range(0, 2):
p = Worker(args=(fqueue,))
workers.append(p)
p.start()
cnt = 0
while True:
for i in range(0, len(workers)):
p = workers[i]
if not p.is_alive():
print "main: worker {0} is not alive".format(p.pid)
p = Worker(args=(fqueue,))
print "main: restarted worker: {0}".format(p)
p.start()
workers[i] = p
print "main: tick"
cnt += 1
fqueue.put(cnt)
time.sleep(2)
1 Answer 1
Have you seen the warning in the documentation?:
Warning
If a process is killed using Process.terminate() or os.kill() while it is trying to use a Queue, then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.
So killing a process that's using a queue can make the whole queue unusable.