class Fluent::ExecInput

Constants

SUPPORTED_FORMAT

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Input.new
# File lib/fluent/plugin/in_exec.rb, line 21
def initialize
  super
  require 'fluent/plugin/exec_util'
  require 'fluent/timezone'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Input#configure
# File lib/fluent/plugin/in_exec.rb, line 48
def configure(conf)
  super

  if localtime = conf['localtime']
    @localtime = true
  elsif utc = conf['utc']
    @localtime = false
  end

  if conf['timezone']
    @timezone = conf['timezone']
    Fluent::Timezone.validate!(@timezone)
  end

  if !@tag && !@tag_key
    raise ConfigError, "'tag' or 'tag_key' option is required on exec input"
  end

  if @time_key
    if @time_format
      f = @time_format
      @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i }
    else
      @time_parse_proc = Proc.new {|str| str.to_i }
    end
  end

  case @format
  when :tsv
    if @keys.empty?
      raise ConfigError, "keys option is required on exec input for tsv format"
    end
    @parser = ExecUtil::TSVParser.new(@keys, method(:on_message))
  when :json
    @parser = ExecUtil::JSONParser.new(method(:on_message))
  when :msgpack
    @parser = ExecUtil::MessagePackParser.new(method(:on_message))
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 120
def run
  @parser.call(@io)
end
run_periodic() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 124
def run_periodic
  until @finished
    begin
      sleep @run_interval
      io = IO.popen(@command, "r")
      @parser.call(io)
      Process.waitpid(io.pid)
    rescue
      log.error "exec failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s
      log.warn_backtrace $!.backtrace
    end
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 99
def shutdown
  if @run_interval
    @finished = true
    @thread.join
  else
    begin
      Process.kill(:TERM, @pid)
    rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    end
    if @thread.join(60)  # TODO wait time
      return
    end

    begin
      Process.kill(:KILL, @pid)
    rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    end
    @thread.join
  end
end
start() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 88
def start
  if @run_interval
    @finished = false
    @thread = Thread.new(&method(:run_periodic))
  else
    @io = IO.popen(@command, "r")
    @pid = @io.pid
    @thread = Thread.new(&method(:run))
  end
end

Private Instance Methods

on_message(record) click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 140
def on_message(record)
  if val = record.delete(@tag_key)
    tag = val
  else
    tag = @tag
  end

  if val = record.delete(@time_key)
    time = @time_parse_proc.call(val)
  else
    time = Engine.now
  end

  router.emit(tag, time, record)
rescue => e
  log.error "exec failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record)
end