AMQ::Client::Async::Consumer
AMQP consumers are entities that handle messages delivered to them ("push API" as opposed to "pull API") by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).
@see AMQP::Queue @see AMQP::Queue#subscribe @see AMQP::Queue#cancel
# File lib/amqp/consumer.rb, line 41 def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false) super(channel, queue, (consumer_tag || self.class.tag_generator.generate_for(queue)), exclusive, no_ack, arguments, no_local) end
@return [AMQ::Client::ConsumerTagGenerator] Consumer tag generator
# File lib/amqp/consumer.rb, line 30 def self.tag_generator @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new end
@param [AMQ::Client::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances @return [AMQ::Client::ConsumerTagGenerator] Provided argument
# File lib/amqp/consumer.rb, line 36 def self.tag_generator=(generator) @tag_generator = generator end
Acknowledge a delivery tag. @return [Consumer] self
@api public @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
# File lib/amqp/consumer.rb, line 142 def acknowledge(delivery_tag) super(delivery_tag) end
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
@api plugin
# File lib/amqp/consumer.rb, line 193 def auto_recover super end
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 175 def before_recovery(&block) super(&block) end
Legacy {AMQP::Queue} API compatibility. @private @deprecated
# File lib/amqp/consumer.rb, line 103 def callback if @callbacks[:delivery] @callbacks[:delivery].first end end
@return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 82 def cancel(nowait = false, &block) @channel.once_open do @queue.once_declared do super(nowait, &block) end end self end
Begin consuming messages from the queue @return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 54 def consume(nowait = false, &block) @channel.once_open do @queue.once_declared do super(nowait, &block) end end self end
@return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)
# File lib/amqp/consumer.rb, line 46 def exclusive? super end
@return [String] Readable representation of relevant object state.
# File lib/amqp/consumer.rb, line 201 def inspect "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}" end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 164 def on_connection_interruption(&block) super(&block) end
Register a block that will be used to handle delivered messages.
@return [AMQP::Consumer] self @see AMQP::Queue#subscribe
# File lib/amqp/consumer.rb, line 114 def on_delivery(&block) # We have to maintain this multiple arities jazz # because older versions this gem are used in examples in at least 3 # books published by O'Reilly :(. MK. delivery_shim = Proc.new { |basic_deliver, headers, payload| case block.arity when 1 then block.call(payload) when 2 then h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload) else h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key) end } super(&delivery_shim) end
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 183 def on_recovery(&block) super(&block) end
@return [Consumer] self
@api public @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
# File lib/amqp/consumer.rb, line 151 def reject(delivery_tag, requeue = true) super(delivery_tag, requeue) end
Used by automatic recovery code. @api plugin @return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 67 def resubscribe(&block) @channel.once_open do @queue.once_declared do self.unregister_with_channel @consumer_tag = self.class.tag_generator.generate_for(@queue) self.register_with_channel super(&block) end end self end
Generated with the Darkfish Rdoc Generator 2.