2
\$\begingroup\$

I've programmed a minimal parallel workers queue in Ruby, and I wanted to know if it's correct, and if there are simpler ways of implementing it.

I'm aware of the existence of MonitorMixin, but it won't save any code, I think.

The design is kept intentionally trivial (e.g. it uses Thread.abort_on_exception).

# Usage:
#
# queue = ParallelWorkersQueue.new( <threads> )
# queue.push { <operation_1> }
# queue.push { <operation_2> }
# queue.join
#
class ParallelWorkersQueue
 def initialize( slots )
 @max_slots = slots
 @free_slots = slots
 @mutex = Mutex.new
 @condition_variable = ConditionVariable.new
 Thread.abort_on_exception = true
 end
 def push( &task )
 @mutex.synchronize do
 while @free_slots == 0
 @condition_variable.wait( @mutex )
 end
 @free_slots -= 1
 end
 Thread.new do
 yield
 @mutex.synchronize do
 @free_slots += 1
 @condition_variable.signal
 end
 end
 end
 def join
 @mutex.synchronize do
 while @free_slots < @max_slots
 @condition_variable.wait( @mutex )
 end
 end
 end
end
asked May 3, 2012 at 18:21
\$\endgroup\$
1
  • \$\begingroup\$ Independently of the improvements, there is bug, actually. If an error is raised when yielding, the following code is not executed - so the latter must be wrapped in an ensure block. \$\endgroup\$ Commented May 4, 2012 at 20:40

2 Answers 2

3
\$\begingroup\$

Like @Victor Moroz, I'm also a fan of Queue. I also prefer to keep the threads around: it's a bit more efficient, but more important, the code is simpler:

require 'thread'
class ParallelWorkersQueue
 def initialize(slots)
 @work = Queue.new
 @threads = slots.times.map do
 Thread.new do
 while (work = @work.deq) != :stop
 work[]
 end
 end
 end
 end
 def join
 @threads.each do
 @work.enq(:stop)
 end
 @threads.each(&:join)
 end
 def push(&task)
 @work.enq(task)
 end
end

Example of its use:

pwq = ParallelWorkersQueue.new(2)
5.times do |i|
 pwq.push do
 puts i
 end
end
pwq.join
# => 0
# => 1
# => 2
# => 3
# => 4

Replace Queue with SizedQueue if you wish to limit the amount of work that can be awaiting a worker thread.

answered May 3, 2012 at 19:08
\$\endgroup\$
1
\$\begingroup\$

I would prefer to use Queue in Erlang style (message passing), it looks cleaner. You can even pass something more useful than :done back, e.g. creating parallel map.

require "thread"
POOL_SIZE = 5
tasks = (0..99).map { |i| lambda { puts "Processing #{i}"; sleep 1 } }
message_queue = Queue.new
start_thread = 
 lambda do
 Thread.new(tasks.shift) do |task|
 task[]
 message_queue.push(:done)
 end
 end
tasks_left = tasks.length
[tasks_left, POOL_SIZE].min.times do
 start_thread[]
end
while tasks_left > 0
 message_queue.pop
 tasks_left -= 1
 start_thread[] unless tasks_left < POOL_SIZE
end
answered May 3, 2012 at 18:59
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.