Making a task multi-threaded using Queue in Ruby
Often times we will have a task that requires running a simple operation or method over all of the items in a list. Usually, we will use #each
or #map
to perform these operations one-at-a-time, but there are times when having multiple operations running at once is more efficient. This is especially true with any operation that requires a network call. Even adding just one extra thread will speed up the task dramatically. However, this idea will work on any portion of code that calls #each
or #map
to call a method or block on multiple objects.
Using an Enumerable
or Array
in a multi-threaded environment can be done easily with Queue, which is thread-safe and similar to Array
in its function. Unlike Array
, Queue
lacks many helper methods and feels surprisingly "low-level" for a native Ruby class. For instance, the only methods to access the data are #push
, #pop
, and #shift
(#pop
and #shift
have the cumbersome side affect of removing the object as you access it). Despite its limitations, Queue
is perfect for a simple stack or FIFO. Although there is no method to convert between an Array and a Queue, there is a simple trick to populate one using any Enumerable
object:
(1..100_000).inject(Queue.new, :push)
Conveniently, Queue#push
returns the Queue instance after performing its operation. This allows us to chain together #push
calls with each object passed as parameter to a call. This is what .inject(Queue.new, :push)
does in the code above:
# Essentially the same
Queue.new.push(1).push(2).push(3) #... .push(100_000)
Now that our Queue is populated, we need to start a fixed number of threads to process it:
NUM_THREADS = 4
Thread.abort_on_exception = true
def do_stuff(object)
# ...
end
@queue = (1..100_000).inject(Queue.new, :push)
@threads = Array.new(NUM_THREADS) do
Thread.new do
until @queue.empty?
# This will remove the first object from @queue
next_object = @queue.shift
do_stuff(next_object)
end
end
end
@threads.each(&:join)
Essentially, the above code creates 4 threads which will continuously pull an object out of @queue
and call #do_stuff
on it until @queue
is empty. Note that Thread.abort_on_exception = true
will allow any errors thrown within a thread to be caught during the #join
call:
begin
@threads.each(&:join)
ensure
cleanup()
end