class Fluent::FileBuffer
Constants
- PATH_MATCH
Attributes
symlink_path[RW]
Public Class Methods
clear_buffer_paths()
click to toggle source
# File lib/fluent/test/input_test.rb, line 20 def self.clear_buffer_paths @@buffer_paths = {} end
new()
click to toggle source
Calls superclass method
Fluent::BasicBuffer.new
# File lib/fluent/plugin/buf_file.rb, line 81 def initialize require 'uri' super end
Public Instance Methods
before_shutdown(out)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 175 def before_shutdown(out) if @flush_at_shutdown synchronize do @map.each_key {|key| push(key) } while pop(out) end end end end
configure(conf)
click to toggle source
Calls superclass method
Fluent::BasicBuffer#configure
# File lib/fluent/plugin/buf_file.rb, line 91 def configure(conf) super if @@buffer_paths.has_key?(@buffer_path) raise ConfigError, "Other '#{@@buffer_paths[@buffer_path]}' plugin already use same buffer_path: type = #{conf['@type'] || conf['type']}, buffer_path = #{@buffer_path}" else @@buffer_paths[@buffer_path] = conf['@type'] || conf['type'] end if pos = @buffer_path.index('*') @buffer_path_prefix = @buffer_path[0,pos] @buffer_path_suffix = @buffer_path[pos+1..-1] else @buffer_path_prefix = @buffer_path+"." @buffer_path_suffix = ".log" end end
enqueue(chunk)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 163 def enqueue(chunk) path = chunk.path mp = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)] m = PATH_MATCH.match(mp) encoded_key = m ? m[1] : "" tsuffix = m[3] npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}" chunk.mv(npath) end
new_chunk(key)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 117 def new_chunk(key) encoded_key = encode_key(key) path, tsuffix = make_path(encoded_key, "b") unique_id = tsuffix_to_unique_id(tsuffix) FileBufferChunk.new(key, path, unique_id, "a+", @symlink_path) end
resume()
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 124 def resume maps = [] queues = [] Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path| match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)] if m = PATH_MATCH.match(match) key = decode_key(m[1]) bq = m[2] tsuffix = m[3] timestamp = m[3].to_i(16) unique_id = tsuffix_to_unique_id(tsuffix) if bq == 'b' chunk = FileBufferChunk.new(key, path, unique_id, "a+") maps << [timestamp, chunk] elsif bq == 'q' chunk = FileBufferChunk.new(key, path, unique_id, "r") queues << [timestamp, chunk] end end } map = {} maps.sort_by {|(timestamp,chunk)| timestamp }.each {|(timestamp,chunk)| map[chunk.key] = chunk } queue = queues.sort_by {|(timestamp,chunk)| timestamp }.map {|(timestamp,chunk)| chunk } return queue, map end
start()
click to toggle source
Calls superclass method
Fluent::BasicBuffer#start
# File lib/fluent/plugin/buf_file.rb, line 110 def start FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), :mode => DEFAULT_DIR_PERMISSION super end
Protected Instance Methods
decode_key(encoded_key)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 193 def decode_key(encoded_key) URI.unescape(encoded_key) end
encode_key(key)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 189 def encode_key(key) URI.escape(key, /[^-_.a-zA-Z0-9]/n) end
make_path(encoded_key, bq)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 197 def make_path(encoded_key, bq) now = Time.now.utc timestamp = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff)) tsuffix = timestamp.to_s(16) path = "#{@buffer_path_prefix}#{encoded_key}.#{bq}#{tsuffix}#{@buffer_path_suffix}" return path, tsuffix end
tsuffix_to_unique_id(tsuffix)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 205 def tsuffix_to_unique_id(tsuffix) tsuffix.scan(/../).map {|x| x.to_i(16) }.pack('C*') * 2 end