class ActiveMessaging::ThreadedPoller

Attributes

busy[RW]
configuration[RW]
connection[RW]
pause[RW]
receiver[RW]
running[RW]
workers[RW]

Public Class Methods

new(connection='default', configuration={}) click to toggle source

connection is a string, name of the connection from broker.yml to use for this threaded poller instance

configuration is a list of hashes each has describes a group of worker threads for each group, define what priorities those workers will process

[
  {
    :pool_size  => 1       # number of workers of this type
    :priorities => [1,2,3] # what message priorities this thread will process
  }
]
# File lib/activemessaging/threaded_poller.rb, line 30
def initialize(connection='default', configuration={})
  # default config is a pool size of 3 worker threads
  self.configuration = configuration || [{:pool_size => 3}]
  self.connection = connection
  self.pause = 1
end

Public Instance Methods

died(worker, reason) click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 118
def died(worker, reason)
  busy.delete(worker)

  if running
    logger.info "uh oh, #{worker.inspect} died because of #{reason.class}"
    worker = Worker.new_link(current_actor)
    workers << worker
    receive(worker)
  else
    logger.info "check to see if busy is empty: #{busy.inspect}"
    if busy.empty?
      logger.info "all died: signal stopped"
      after(0){ signal(:shutdown) }
    end
  end
end
dispatch(message, worker) click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 97
def dispatch(message, worker)
  workers.delete(worker)
  busy << worker
  worker.execute!(message)
end
executed(worker) click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 103
def executed(worker)
  busy.delete(worker)

  if running
    workers << worker
    receive(worker)
  else
    worker.terminate if worker.alive?
    if busy.empty?
      logger.info "all executed: signal stopped"
      after(0) { signal(:shutdown) }
    end
  end
end
inspect() click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 139
def inspect
  "#<ThreadedPoller #{to_s}>"
end
log_status() click to toggle source

recursive method, uses celluloid 'after' to keep calling

# File lib/activemessaging/threaded_poller.rb, line 87
def log_status
  return unless logger.debug?
  logger.debug("ActiveMessaging::ThreadedPoller: conn:#{connection}, #{workers.count}, #{busy.count}, #{running}")
  after(10){ log_status }
end
logger() click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 147
def logger; ActiveMessaging.logger; end
receive(worker) click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 93
def receive(worker)
  receiver.receive!(worker) if (receiver && running && worker)
end
start() click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 37
def start
  logger.info "ActiveMessaging::ThreadedPoller start"

  # these are workers ready to use
  self.workers = []
  
  # these are workers already working
  self.busy = []
  
  # this indicates if we are running or not, helps threads to stop gracefully
  self.running = true
  
  # subscribe will create the connections based on subscriptions in processsors
  # (you can't find or use the connection until it is created by calling this)
  ActiveMessaging::Gateway.subscribe
  
  # create a message receiver actor, ony need one, using connection
  receiver_connection = ActiveMessaging::Gateway.connection(connection)
  self.receiver = MessageReceiver.new(current_actor, receiver_connection, pause)
  
  # start the workers based on the config
  configuration.each do |c|
    (c[:pool_size] || 1).times{ self.workers << Worker.new_link(current_actor, c) }
  end

  # once all workers are created, start them up
  self.workers.each{|worker| receive(worker)}

  # in debug level, log info about workers every 10 seconds
  log_status
end
stop() click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 69
def stop
  logger.info "ActiveMessaging::ThreadedPoller stop"
  # indicates to all busy workers not to pick up another messages, but does not interrupt
  # also indicates to the message receiver to stop getting more messages 
  self.running = false

  # tell each waiting worker to shut down.  Running ones will be allowed to finish
  receiver.terminate! if receiver.alive?
  logger.info "ActiveMessaging::ThreadedPoller receiver terminated"

  workers.each { |w| w.terminate! if w.alive? }
  logger.info "ActiveMessaging::ThreadedPoller workers terminated"


  after(0) { signal(:shutdown) } if stopped?
end
stopped?() click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 135
def stopped?
  (!running && busy.empty?)
end
to_s() click to toggle source
# File lib/activemessaging/threaded_poller.rb, line 143
def to_s
  @str ||= "#{Process.pid}-#{Thread.current.object_id}:#{self.object_id}"
end