class NewRelic::Agent::PipeChannelManager::Listener

Attributes

pipes[RW]

This attr_accessor intentionally provides unsynchronized access to the @pipes hash. It is used to look up the write end of the pipe from within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.

select_timeout[RW]

This attr_accessor intentionally provides unsynchronized access to the @pipes hash. It is used to look up the write end of the pipe from within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.

thread[R]
timeout[RW]

This attr_accessor intentionally provides unsynchronized access to the @pipes hash. It is used to look up the write end of the pipe from within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.

Public Class Methods

new() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 140
def initialize
  @pipes = {}
  @pipes_lock = Mutex.new

  @timeout = 360
  @select_timeout = 60
end

Public Instance Methods

close_all_pipes() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 206
def close_all_pipes
  @pipes_lock.synchronize do
    @pipes.each do |id, pipe|
      pipe.close if pipe
    end
    @pipes = {}
  end
end
register_pipe(id) click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 152
def register_pipe(id)
  @pipes_lock.synchronize do
    @pipes[id] = Pipe.new
  end

  wakeup
end
start() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 160
def start
  return if @started == true
  @started = true
  @thread = NewRelic::Agent::Threading::AgentThread.create('Pipe Channel Manager') do
    now = nil
    loop do
      clean_up_pipes

      pipes_to_listen_to = @pipes_lock.synchronize do
        @pipes.values.map{|pipe| pipe.out} + [wake.out]
      end

      NewRelic::Agent.record_metric('Supportability/Listeners',
        (Time.now - now).to_f) if now

      if ready = IO.select(pipes_to_listen_to, [], [], @select_timeout)
        now = Time.now

        ready_pipes = ready[0]
        ready_pipes.each do |pipe|
          merge_data_from_pipe(pipe) unless pipe == wake.out
        end

        wake.out.read(1) if ready_pipes.include?(wake.out)
      end

      break unless should_keep_listening?
    end
  end
  sleep 0.001 # give time for the thread to spawn
end
started?() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 219
def started?
  @started
end
stop() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 198
def stop
  return unless @started == true
  stop_listener_thread
  close_all_pipes
  @wake.close
  @wake = nil
end
stop_listener_thread() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 192
def stop_listener_thread
  @started = false
  wakeup
  @thread.join
end
wake() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 215
def wake
  @wake ||= Pipe.new
end
wakeup() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 148
def wakeup
  wake.in << '.'
end

Protected Instance Methods

clean_up_pipes() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 257
def clean_up_pipes
  @pipes_lock.synchronize do
    @pipes.values.each do |pipe|
      if pipe.last_read.to_f + @timeout < Time.now.to_f
        pipe.close unless pipe.closed?
      end
    end
    @pipes.reject! {|id, pipe| pipe.out.closed? }
  end
end
find_pipe_for_handle(out_handle) click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 268
def find_pipe_for_handle(out_handle)
  @pipes_lock.synchronize do
    @pipes.values.find{|pipe| pipe.out == out_handle }
  end
end
merge_data_from_pipe(pipe_handle) click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 225
def merge_data_from_pipe(pipe_handle)
  pipe = find_pipe_for_handle(pipe_handle)
  raw_payload = pipe.read
  if raw_payload && !raw_payload.empty?
    if raw_payload == Pipe::READY_MARKER
      pipe.after_fork_in_parent
    else
      payload = unmarshal(raw_payload)
      if payload
        endpoint, items = payload
        NewRelic::Agent.agent.merge_data_for_endpoint(endpoint, items)
      end
    end
  end

  pipe.close if pipe.eof?
end
should_keep_listening?() click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 253
def should_keep_listening?
  @started || @pipes_lock.synchronize { @pipes.values.find{|pipe| !pipe.in.closed?} }
end
unmarshal(data) click to toggle source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 243
def unmarshal(data)
  NewRelic::LanguageSupport.with_cautious_gc do
    Marshal.load(data)
  end
rescue StandardError => e
  ::NewRelic::Agent.logger.error "Failure unmarshalling message from Resque child process", e
  ::NewRelic::Agent.logger.debug Base64.encode64(data)
  nil
end