[フレーム]
Last Updated: February 25, 2016
·
7.596K
· hololeap

Making a task multi-threaded using Queue in Ruby

Often times we will have a task that requires running a simple operation or method over all of the items in a list. Usually, we will use #each or #map to perform these operations one-at-a-time, but there are times when having multiple operations running at once is more efficient. This is especially true with any operation that requires a network call. Even adding just one extra thread will speed up the task dramatically. However, this idea will work on any portion of code that calls #each or #map to call a method or block on multiple objects.

Using an Enumerable or Array in a multi-threaded environment can be done easily with Queue, which is thread-safe and similar to Array in its function. Unlike Array, Queue lacks many helper methods and feels surprisingly "low-level" for a native Ruby class. For instance, the only methods to access the data are #push, #pop, and #shift (#pop and #shift have the cumbersome side affect of removing the object as you access it). Despite its limitations, Queue is perfect for a simple stack or FIFO. Although there is no method to convert between an Array and a Queue, there is a simple trick to populate one using any Enumerable object:

(1..100_000).inject(Queue.new, :push)

Conveniently, Queue#push returns the Queue instance after performing its operation. This allows us to chain together #push calls with each object passed as parameter to a call. This is what .inject(Queue.new, :push) does in the code above:

# Essentially the same
Queue.new.push(1).push(2).push(3) #... .push(100_000)

Now that our Queue is populated, we need to start a fixed number of threads to process it:

NUM_THREADS = 4
Thread.abort_on_exception = true

def do_stuff(object)
 # ...
end

@queue = (1..100_000).inject(Queue.new, :push)

@threads = Array.new(NUM_THREADS) do
 Thread.new do
 until @queue.empty?
 # This will remove the first object from @queue
 next_object = @queue.shift

 do_stuff(next_object)
 end
 end
end

@threads.each(&:join)

Essentially, the above code creates 4 threads which will continuously pull an object out of @queue and call #do_stuff on it until @queue is empty. Note that Thread.abort_on_exception = true will allow any errors thrown within a thread to be caught during the #join call:

begin
 @threads.each(&:join)
ensure
 cleanup()
end

AltStyle によって変換されたページ (->オリジナル) /