3
\$\begingroup\$

Much in the spirit of this question, I have implemented a simple class for parallel workers and wanted to get some feedback on it. Can I make the code even more concise or readable? Are there any hidden problems that I should be aware of?

Worker class

class Worker
 def initialize queue
 @queue = queue
 # Set the "idle state" (returned by the idle? method)
 # and a mutex for accessing it
 @idle_state = false
 @idle_mutex = Mutex.new
 # Set the "exit state" (returned by the done? method)
 # and a mutex for accessing it
 @exit_state = false
 @exit_mutex = Mutex.new
 poll
 end
 # Poll a queue for Proc objects to process
 def poll
 @thread = Thread.new do
 loop do
 while @queue.empty?
 set_idle true
 exit 0 if done?
 sleep 1
 end
 set_idle false
 job = @queue.pop
 job.call
 end
 end
 end
 def done?
 @exit_mutex.synchronize { @exit_state }
 end
 def shut_down
 @exit_mutex.synchronize { @exit_state = true }
 @thread.join
 end
 def idle?
 @idle_mutex.synchronize { @idle_state }
 end
 def set_idle state
 @idle_mutex.synchronize { @idle_state = state }
 end
end

Usage

# Set up a job queue
queue = Queue.new
# Spin up a few workers
workers = []
(1..5).each do
 workers << Worker.new(queue)
end
# Add some jobs to the queue
(1..50).each do |i|
 queue << Proc.new { $stdout.print "Job ##{i}\n" }
end
# Shut down each of the workers once the queue is empty
sleep 0.1 until queue.empty?
workers.each { |worker| worker.shut_down }

References:

asked Jul 5, 2013 at 22:45
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Where not to lock
I'm not sure why you decided to lock on @exit_state and @idle_state, as they don't seem to have any potential to cause any race condition (except for maybe when the manager calls thread.shut_down, and even then - all that would happen is that you will 'lose' a round of one second - hardly catastrophic).

Where to lock
When you want to concurrently work on a queue - you should synchronize your work with the queue (in this case @queue.empty? and @queue.pop), which you don't do...

How to lock
You should also note the @queue.pop is implicitly @queue.pop(false) which means that if the queue is empty, it will block until a new message arrives, which is probably not what you want, since you can't be sure that it is not empty (between while @queue.empty? to job = @queue.pop another worker might have 'hijacked' your message).

Since Queue is thread-safe, you could write your worker like this:

class Worker
 def initialize(queue)
 @thread = Thread.new { poll(queue) }
 end
 def poll(queue)
 until done?
 begin
 queue.pop(true).call
 rescue ThreadError
 # queue was empty
 sleep 1
 end
 end
 exit 0
 end
 def done?
 @done
 end
 def shut_down
 @done = true
 end
end
answered Feb 20, 2014 at 14:52
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.