class Fluent::Supervisor

Public Class Methods

default_options() click to toggle source
# File lib/fluent/supervisor.rb, line 81
def self.default_options
  {
    :config_path => Fluent::DEFAULT_CONFIG_PATH,
    :plugin_dirs => [Fluent::DEFAULT_PLUGIN_DIR],
    :log_level => Fluent::Log::LEVEL_INFO,
    :log_path => nil,
    :daemonize => nil,
    :libs => [],
    :setup_path => nil,
    :chuser => nil,
    :chgroup => nil,
    :suppress_interval => 0,
    :suppress_repeated_stacktrace => true,
    :without_source => false,
    :use_v1_config => true,
    :supervise => true,
  }
end
get_etc_group(group) click to toggle source
# File lib/fluent/supervisor.rb, line 30
def self.get_etc_group(group)
  if group.to_i.to_s == group
    Etc.getgrgid(group.to_i)
  else
    Etc.getgrnam(group)
  end
end
get_etc_passwd(user) click to toggle source
# File lib/fluent/supervisor.rb, line 22
def self.get_etc_passwd(user)
  if user.to_i.to_s == user
    Etc.getpwuid(user.to_i)
  else
    Etc.getpwnam(user)
  end
end
new(opt) click to toggle source
# File lib/fluent/supervisor.rb, line 100
def initialize(opt)
  @daemonize = opt[:daemonize]
  @supervise = opt[:supervise]
  @config_path = opt[:config_path]
  @inline_config = opt[:inline_config]
  @use_v1_config = opt[:use_v1_config]
  @log_path = opt[:log_path]
  @dry_run = opt[:dry_run]
  @libs = opt[:libs]
  @plugin_dirs = opt[:plugin_dirs]
  @chgroup = opt[:chgroup]
  @chuser = opt[:chuser]
  @rpc_server = nil

  @log_level = opt[:log_level]
  @suppress_interval = opt[:suppress_interval]
  @suppress_config_dump = opt[:suppress_config_dump]
  @without_source = opt[:without_source]

  log_opts = {:suppress_repeated_stacktrace => opt[:suppress_repeated_stacktrace]}
  @log = LoggerInitializer.new(@log_path, @log_level, @chuser, @chgroup, log_opts)
  @finished = false
  @main_pid = nil
end

Public Instance Methods

options() click to toggle source
# File lib/fluent/supervisor.rb, line 164
def options
  {
    'config_path' => @config_path,
    'pid_file' => @daemonize,
    'plugin_dirs' => @plugin_dirs,
    'log_path' => @log_path
  }
end
start() click to toggle source
# File lib/fluent/supervisor.rb, line 125
def start
  @log.init
  read_config
  apply_system_config

  dry_run if @dry_run
  start_daemonize if @daemonize
  setup_rpc_server if @rpc_endpoint
  if @supervise
    install_supervisor_signal_handlers
    run_rpc_server if @rpc_endpoint
    until @finished
      supervise do
        change_privilege
        init_engine
        install_main_process_signal_handlers
        run_configure
        finish_daemonize if @daemonize
        run_engine
        exit 0
      end
      $log.error "fluentd main process died unexpectedly. restarting." unless @finished
    end
  else
    $log.info "starting fluentd-#{Fluent::VERSION} without supervision"
    run_rpc_server if @rpc_endpoint
    main_process do
      change_privilege
      init_engine
      install_main_process_signal_handlers
      run_configure
      finish_daemonize if @daemonize
      run_engine
      exit 0
    end
  end
  stop_rpc_server if @rpc_endpoint
end

Private Instance Methods

apply_system_config() click to toggle source

TODO: this method should be moved to SystemConfig class method

# File lib/fluent/supervisor.rb, line 446
def apply_system_config
  systems = @conf.elements.select { |e|
    e.name == 'system'
  }
  return if systems.empty?
  raise ConfigError, "<system> is duplicated. <system> should be only one" if systems.size > 1

  SystemConfig.new(systems.first).apply(self)
end
change_privilege() click to toggle source
# File lib/fluent/supervisor.rb, line 460
def change_privilege
  if @chgroup
    etc_group = Supervisor.get_etc_group(@chgroup)
    Process::GID.change_privilege(etc_group.gid)
  end

  if @chuser
    etc_pw = Supervisor.get_etc_passwd(@chuser)
    user_groups = [etc_pw.gid]
    Etc.setgrent
    Etc.group { |gr| user_groups << gr.gid if gr.mem.include?(etc_pw.name) } # emulate 'id -G'

    Process.groups = Process.groups | user_groups
    Process::UID.change_privilege(etc_pw.uid)
  end
end
dry_run() click to toggle source
# File lib/fluent/supervisor.rb, line 175
def dry_run
  $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode"

  change_privilege
  init_engine
  install_main_process_signal_handlers
  run_configure
  exit 0
rescue => e
  $log.error "dry run failed: #{e}"
  exit 1
end
finish_daemonize() click to toggle source
# File lib/fluent/supervisor.rb, line 230
def finish_daemonize
  if @wait_daemonize_pipe_w
    STDIN.reopen("/dev/null")
    STDOUT.reopen("/dev/null", "w")
    STDERR.reopen("/dev/null", "w")
    @wait_daemonize_pipe_w.write @supervisor_pid.to_s
    @wait_daemonize_pipe_w.close
    @wait_daemonize_pipe_w = nil
  end
end
init_engine() click to toggle source
# File lib/fluent/supervisor.rb, line 477
def init_engine
  init_opts = {:suppress_interval => @suppress_interval, :suppress_config_dump => @suppress_config_dump, :without_source => @without_source}
  Fluent::Engine.init(init_opts)

  @libs.each {|lib|
    require lib
  }

  @plugin_dirs.each {|dir|
    if Dir.exist?(dir)
      dir = File.expand_path(dir)
      Fluent::Engine.load_plugin_dir(dir)
    end
  }
end
install_main_process_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 493
def install_main_process_signal_handlers
  # Strictly speaking, these signal handling is not thread safe.
  # But enough safe to limit twice call of Fluent::Engine.stop.

  trap :INT do
    $log.debug "fluentd main process get SIGINT"
    unless @finished
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  trap :TERM do
    $log.debug "fluentd main process get SIGTERM"
    unless @finished
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  trap :HUP do
    # TODO
    $log.debug "fluentd main process get SIGHUP"
  end

  trap :USR1 do
    $log.debug "fluentd main process get SIGUSR1"
    $log.info "force flushing buffered events"
    @log.reopen!

    # Creating new thread due to mutex can't lock
    # in main thread during trap context
    Thread.new {
      begin
        Fluent::Engine.flush!
        $log.debug "flushing thread: flushed"
      rescue Exception => e
        $log.warn "flushing thread error: #{e}"
      end
    }.run
  end
end
install_supervisor_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 330
def install_supervisor_signal_handlers
  trap :INT do
    $log.debug "fluentd supervisor process get SIGINT"
    supervisor_sigint_handler
  end

  trap :TERM do
    $log.debug "fluentd supervisor process get SIGTERM"
    supervisor_sigterm_handler
  end

  trap :HUP do
    $log.debug "fluentd supervisor process get SIGHUP"
    $log.info "restarting"
    supervisor_sighup_handler
  end

  trap :USR1 do
    $log.debug "fluentd supervisor process get SIGUSR1"
    supervisor_sigusr1_handler
  end
end
main_process(&block) click to toggle source
# File lib/fluent/supervisor.rb, line 304
def main_process(&block)
  begin
    block.call

  rescue Fluent::ConfigError
    $log.error "config error", :file=>@config_path, :error=>$!.to_s
    $log.debug_backtrace
    unless @log.stdout?
      console = Fluent::Log.new(STDOUT, @log_level).enable_debug
      console.error "config error", :file=>@config_path, :error=>$!.to_s
      console.debug_backtrace
    end

  rescue
    $log.error "unexpected error", :error=>$!.to_s
    $log.error_backtrace
    unless @log.stdout?
      console = Fluent::Log.new(STDOUT, @log_level).enable_debug
      console.error "unexpected error", :error=>$!.to_s
      console.error_backtrace
    end
  end

  exit! 1
end
read_config() click to toggle source
# File lib/fluent/supervisor.rb, line 402
def read_config
  $log.info "reading config file", :path => @config_path
  @config_fname = File.basename(@config_path)
  @config_basedir = File.dirname(@config_path)
  @config_data = File.read(@config_path)
  if @inline_config == '-'
    @config_data << "\n" << STDIN.read
  elsif @inline_config
    @config_data << "\n" << @inline_config.gsub("\\n","\n")
  end
  @conf = Fluent::Config.parse(@config_data, @config_fname, @config_basedir, @use_v1_config)
end
run_configure() click to toggle source
# File lib/fluent/supervisor.rb, line 456
def run_configure
  Fluent::Engine.run_configure(@conf)
end
run_engine() click to toggle source
# File lib/fluent/supervisor.rb, line 538
def run_engine
  Fluent::Engine.run
end
run_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 268
def run_rpc_server
  @rpc_server.start
end
setup_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 241
def setup_rpc_server
  @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

  # built-in RPC for signals
  @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
    supervisor_sigint_handler
    nil
  }
  @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.killWorkers request"
    supervisor_sigterm_handler
    nil
  }
  @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
    $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
    supervisor_sigusr1_handler
    nil
  }
  @rpc_server.mount_proc('/api/config.reload') { |req, res|
    $log.debug "fluentd RPC got /api/config.reload request"
    $log.info "restarting"
    supervisor_sighup_handler
    nil
  }
end
start_daemonize() click to toggle source
# File lib/fluent/supervisor.rb, line 188
def start_daemonize
  @wait_daemonize_pipe_r, @wait_daemonize_pipe_w = IO.pipe

  if fork
    # console process
    @wait_daemonize_pipe_w.close
    @wait_daemonize_pipe_w = nil
    wait_daemonize
    exit 0
  end

  # daemonize intermediate process
  @wait_daemonize_pipe_r.close
  @wait_daemonize_pipe_r = nil

  # in case the child process forked during run_configure
  @wait_daemonize_pipe_w.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)

  Process.setsid
  exit!(0) if fork
  File.umask(0)

  # supervisor process
  @supervisor_pid = Process.pid
end
stop_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 272
def stop_rpc_server
  @rpc_server.shutdown
end
supervise(&block) click to toggle source
# File lib/fluent/supervisor.rb, line 276
def supervise(&block)
  start_time = Time.now

  $log.info "starting fluentd-#{Fluent::VERSION}"
  @main_pid = fork do
    main_process(&block)
  end

  if @daemonize && @wait_daemonize_pipe_w
    STDIN.reopen("/dev/null")
    STDOUT.reopen("/dev/null", "w")
    STDERR.reopen("/dev/null", "w")
    @wait_daemonize_pipe_w.close
    @wait_daemonize_pipe_w = nil
  end

  Process.waitpid(@main_pid)
  @main_pid = nil
  ecode = $?.to_i

  $log.info "process finished", :code=>ecode

  if !@finished && Time.now - start_time < 1
    $log.warn "process died within 1 second. exit."
    exit ecode
  end
end
supervisor_sighup_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 381
def supervisor_sighup_handler
  # Creating new thread due to mutex can't lock
  # in main thread during trap context
  Thread.new {
    read_config
    apply_system_config
    if pid = @main_pid
      Process.kill(:TERM, pid)
      # don't resuce Erro::ESRSH here (invalid status)
    end
  }.run
end
supervisor_sigint_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 353
def supervisor_sigint_handler
  @finished = true
  if pid = @main_pid
    # kill processes only still exists
    unless Process.waitpid(pid, Process::WNOHANG)
      begin
        Process.kill(:INT, pid)
      rescue Errno::ESRCH
        # ignore processes already died
      end
    end
  end
end
supervisor_sigterm_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 367
def supervisor_sigterm_handler
  @finished = true
  if pid = @main_pid
    # kill processes only still exists
    unless Process.waitpid(pid, Process::WNOHANG)
      begin
        Process.kill(:TERM, pid)
      rescue Errno::ESRCH
        # ignore processes already died
      end
    end
  end
end
supervisor_sigusr1_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 394
def supervisor_sigusr1_handler
  @log.reopen!
  if pid = @main_pid
    Process.kill(:USR1, pid)
    # don't resuce Erro::ESRSH here (invalid status)
  end
end
wait_daemonize() click to toggle source
# File lib/fluent/supervisor.rb, line 214
def wait_daemonize
  supervisor_pid = @wait_daemonize_pipe_r.read
  if supervisor_pid.empty?
    # initialization failed
    exit! 1
  end

  @wait_daemonize_pipe_r.close
  @wait_daemonize_pipe_r = nil

  # write pid file
  File.open(@daemonize, "w") {|f|
    f.write supervisor_pid
  }
end