I've programmed a minimal parallel workers queue in Ruby, and I wanted to know if it's correct, and if there are simpler ways of implementing it.
I'm aware of the existence of MonitorMixin, but it won't save any code, I think.
The design is kept intentionally trivial (e.g. it uses Thread.abort_on_exception).
# Usage:
#
# queue = ParallelWorkersQueue.new( <threads> )
# queue.push { <operation_1> }
# queue.push { <operation_2> }
# queue.join
#
class ParallelWorkersQueue
def initialize( slots )
@max_slots = slots
@free_slots = slots
@mutex = Mutex.new
@condition_variable = ConditionVariable.new
Thread.abort_on_exception = true
end
def push( &task )
@mutex.synchronize do
while @free_slots == 0
@condition_variable.wait( @mutex )
end
@free_slots -= 1
end
Thread.new do
yield
@mutex.synchronize do
@free_slots += 1
@condition_variable.signal
end
end
end
def join
@mutex.synchronize do
while @free_slots < @max_slots
@condition_variable.wait( @mutex )
end
end
end
end
-
\$\begingroup\$ Independently of the improvements, there is bug, actually. If an error is raised when yielding, the following code is not executed - so the latter must be wrapped in an ensure block. \$\endgroup\$Marcus– Marcus2012年05月04日 20:40:18 +00:00Commented May 4, 2012 at 20:40
2 Answers 2
Like @Victor Moroz, I'm also a fan of Queue. I also prefer to keep the threads around: it's a bit more efficient, but more important, the code is simpler:
require 'thread'
class ParallelWorkersQueue
def initialize(slots)
@work = Queue.new
@threads = slots.times.map do
Thread.new do
while (work = @work.deq) != :stop
work[]
end
end
end
end
def join
@threads.each do
@work.enq(:stop)
end
@threads.each(&:join)
end
def push(&task)
@work.enq(task)
end
end
Example of its use:
pwq = ParallelWorkersQueue.new(2)
5.times do |i|
pwq.push do
puts i
end
end
pwq.join
# => 0
# => 1
# => 2
# => 3
# => 4
Replace Queue with SizedQueue if you wish to limit the amount of work that can be awaiting a worker thread.
I would prefer to use Queue
in Erlang style (message passing), it looks cleaner. You can even pass something more useful than :done
back, e.g. creating parallel map.
require "thread"
POOL_SIZE = 5
tasks = (0..99).map { |i| lambda { puts "Processing #{i}"; sleep 1 } }
message_queue = Queue.new
start_thread =
lambda do
Thread.new(tasks.shift) do |task|
task[]
message_queue.push(:done)
end
end
tasks_left = tasks.length
[tasks_left, POOL_SIZE].min.times do
start_thread[]
end
while tasks_left > 0
message_queue.pop
tasks_left -= 1
start_thread[] unless tasks_left < POOL_SIZE
end