class Fluent::RoundRobinOutput
Attributes
outputs[R]
rand_seed[RW]
weights[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Output.new
# File lib/fluent/plugin/out_roundrobin.rb, line 21 def initialize super @outputs = [] @weights = [] end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Output#configure
# File lib/fluent/plugin/out_roundrobin.rb, line 31 def configure(conf) super conf.elements.select {|e| e.name == 'store' }.each {|e| type = e['@type'] || e['type'] unless type raise ConfigError, "Missing 'type' parameter on <store> directive" end weight = e['weight'] weight = weight ? weight.to_i : 1 log.debug "adding store type=#{type.dump}, weight=#{weight}" output = Plugin.new_output(type) output.router = router output.configure(e) @outputs << output @weights << weight } @rr = -1 # starts from @output[0] @rand_seed = Random.new.seed end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_roundrobin.rb, line 70 def emit(tag, es, chain) next_output.emit(tag, es, chain) end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_roundrobin.rb, line 64 def shutdown @outputs.each {|o| o.shutdown } end
start()
click to toggle source
# File lib/fluent/plugin/out_roundrobin.rb, line 56 def start rebuild_weight_array @outputs.each {|o| o.start } end
Private Instance Methods
next_output()
click to toggle source
# File lib/fluent/plugin/out_roundrobin.rb, line 76 def next_output @rr = 0 if (@rr += 1) >= @weight_array.size @weight_array[@rr] end
rebuild_weight_array()
click to toggle source
# File lib/fluent/plugin/out_roundrobin.rb, line 81 def rebuild_weight_array gcd = @weights.inject(0) {|r,w| r.gcd(w) } weight_array = [] @outputs.zip(@weights).each {|output,weight| (weight / gcd).times { weight_array << output } } # don't randomize order if all weight is 1 (=default) if @weights.any? {|w| w > 1 } r = Random.new(@rand_seed) weight_array.sort_by! { r.rand } end @weight_array = weight_array end