Last Updated: February 25, 2016
·
525
· oystersauce8

A ruby threads pool implementation

#!/usr/bin/env ruby
# A test script
# to run do, MAX_THREADS=2 THREAD_TIMEOUT=90 ./browser_tests.rb
start_time = Time.now
require 'pry'
require 'colorize'
print "started #{__FILE__}\n"

class App
  MAX_THREADS = Integer(ENV['MAX_THREADS']) rescue 2
  THREAD_TIMEOUT = Integer(ENV['THREAD_TIMEOUT']) rescue 5

  def run_loop
    stats_line
    loop do
      spawn_thread if !@tokens.empty? && !@work_queue.empty?
      kill_hung
      if @work_queue.empty?
        stats_line
        print "sleep #{THREAD_TIMEOUT} before exit\n"
        sleep THREAD_TIMEOUT
        stats_line
        break
      end
      sleep 1
    end
  end

  def initialize
    @tokens = Queue.new
    @work_queue = Queue.new
    @work_size = 0
    @threads = []
    0.upto(MAX_THREADS-1) { |t| @tokens.push t }
    queue_work
  end

  def stats_line
    pending = @work_queue.size
    print "TOKENS AVAILABLE #{@tokens.size} WORK_SIZE #{@work_size} PENDING #{pending}"+"\n"
    @threads.each do |t|
      print "**** THREAD #{t[:thread]} AGE #{Time.now.to_i-t[:started].to_i} "+
           "TOKEN #{t[:thread][:token]} STATE #{t[:thread].status}\n"

    end

  end

  def spawn_thread
    @threads << ({ :started => Time.now, :thread => Thread.new do
                                                     Thread.abort_on_exception = true
                                                     token = @tokens.pop
                                                     Thread.current[:token] = token
                                                     work = @work_queue.pop
                                                     print_padded(token,"[working #{token}")
                                                     work_pp(work,token)
                                                     ran = rand(0..15)
                                                     print_padded(token,"sleep #{ran}")
                                                     sleep ran
                                                     print_padded(token,"]")
                                                   end
                })
  end

  def kill_hung
    # a thread status of false means the thread exit normally
    print "\n"
    #@threads.each do |t|
    #  print "THREAD #{t[:thread]} AGE #{Time.now.to_i-t[:started].to_i} "+
    #        "TOKEN #{t[:thread][:token]} STATE #{t[:thread].status}\n"
    #end
    print "\n"
    @threads.each do |t| 
      if t[:thread].status==false
        print_padded(t[:thread][:token].to_i,"purging #{t[:thread][:token]}")
      end
    end
    @threads.reject! do |t|
      t[:thread].status==false && @tokens.push(t[:thread][:token])
    end
  end

  def wait_and_exit
    print "wait_and_exit()\n"
    print "sleep #{THREAD_TIMEOUT}\n"
    sleep THREAD_TIMEOUT
    exit
  end

  def print_padded(indent,str)
    #print str.lpad(20*(indent+1))+"\n"
    print " "*20+" "*20*(indent)+str+"\n"
  end

  def queue_work
    tests_blob = DATA.read.to_s
    tests_array = tests_blob.split("\n\n")[0..9]
    tests_array.map.with_index do |t,index|
      @work_queue.push ({ :id => index, :steps => t })
    end
    @work_size = @work_queue.size
  end

  def work_pp(work,indent)
    s = work[:steps].split("\n")
    s[0..3].each do |line|
      print_padded(indent,line)
    end

  end
end

app = App.new
Signal.trap("TSTP") { app.stats_line()  }
app.run_loop
elapsed = Time.now-start_time
print "#{__FILE__} exited normally (#{elapsed.round(2)} secs).\n"
__END__
# user visits dashboard
visit "http://www.google.com"

# clicks the Reports link
visit "http://www.yahoo.com"