Last Updated: February 25, 2016
·
7.532K
· hololeap

Making a task multi-threaded using Queue in Ruby

Often times we will have a task that requires running a simple operation or method over all of the items in a list. Usually, we will use #each or #map to perform these operations one-at-a-time, but there are times when having multiple operations running at once is more efficient. This is especially true with any operation that requires a network call. Even adding just one extra thread will speed up the task dramatically. However, this idea will work on any portion of code that calls #each or #map to call a method or block on multiple objects.

Using an Enumerable or Array in a multi-threaded environment can be done easily with Queue, which is thread-safe and similar to Array in its function. Unlike Array, Queue lacks many helper methods and feels surprisingly "low-level" for a native Ruby class. For instance, the only methods to access the data are #push, #pop, and #shift (#pop and #shift have the cumbersome side affect of removing the object as you access it). Despite its limitations, Queue is perfect for a simple stack or FIFO. Although there is no method to convert between an Array and a Queue, there is a simple trick to populate one using any Enumerable object:

(1..100_000).inject(Queue.new, :push)

Conveniently, Queue#push returns the Queue instance after performing its operation. This allows us to chain together #push calls with each object passed as parameter to a call. This is what .inject(Queue.new, :push) does in the code above:

# Essentially the same
Queue.new.push(1).push(2).push(3) #... .push(100_000)

Now that our Queue is populated, we need to start a fixed number of threads to process it:

NUM_THREADS = 4
Thread.abort_on_exception = true

def do_stuff(object)
  # ...
end

@queue = (1..100_000).inject(Queue.new, :push)

@threads = Array.new(NUM_THREADS) do
  Thread.new do
    until @queue.empty?
      # This will remove the first object from @queue
      next_object = @queue.shift

      do_stuff(next_object)
    end
  end
end

@threads.each(&:join)

Essentially, the above code creates 4 threads which will continuously pull an object out of @queue and call #do_stuff on it until @queue is empty. Note that Thread.abort_on_exception = true will allow any errors thrown within a thread to be caught during the #join call:

begin
  @threads.each(&:join)
ensure
  cleanup()
end