Much in the spirit of this question, I have implemented a simple class for parallel workers and wanted to get some feedback on it. Can I make the code even more concise or readable? Are there any hidden problems that I should be aware of?
Worker class
class Worker
def initialize queue
@queue = queue
# Set the "idle state" (returned by the idle? method)
# and a mutex for accessing it
@idle_state = false
@idle_mutex = Mutex.new
# Set the "exit state" (returned by the done? method)
# and a mutex for accessing it
@exit_state = false
@exit_mutex = Mutex.new
poll
end
# Poll a queue for Proc objects to process
def poll
@thread = Thread.new do
loop do
while @queue.empty?
set_idle true
exit 0 if done?
sleep 1
end
set_idle false
job = @queue.pop
job.call
end
end
end
def done?
@exit_mutex.synchronize { @exit_state }
end
def shut_down
@exit_mutex.synchronize { @exit_state = true }
@thread.join
end
def idle?
@idle_mutex.synchronize { @idle_state }
end
def set_idle state
@idle_mutex.synchronize { @idle_state = state }
end
end
Usage
# Set up a job queue
queue = Queue.new
# Spin up a few workers
workers = []
(1..5).each do
workers << Worker.new(queue)
end
# Add some jobs to the queue
(1..50).each do |i|
queue << Proc.new { $stdout.print "Job ##{i}\n" }
end
# Shut down each of the workers once the queue is empty
sleep 0.1 until queue.empty?
workers.each { |worker| worker.shut_down }
References:
1 Answer 1
Where not to lock
I'm not sure why you decided to lock on @exit_state
and @idle_state
, as they don't seem to have any potential to cause any race condition (except for maybe when the manager calls thread.shut_down
, and even then - all that would happen is that you will 'lose' a round of one second - hardly catastrophic).
Where to lock
When you want to concurrently work on a queue - you should synchronize your work with the queue (in this case @queue.empty?
and @queue.pop
), which you don't do...
How to lock
You should also note the @queue.pop
is implicitly @queue.pop(false)
which means that if the queue is empty, it will block until a new message arrives, which is probably not what you want, since you can't be sure that it is not empty (between while @queue.empty?
to job = @queue.pop
another worker might have 'hijacked' your message).
Since Queue
is thread-safe, you could write your worker like this:
class Worker
def initialize(queue)
@thread = Thread.new { poll(queue) }
end
def poll(queue)
until done?
begin
queue.pop(true).call
rescue ThreadError
# queue was empty
sleep 1
end
end
exit 0
end
def done?
@done
end
def shut_down
@done = true
end
end