class ActiveMessaging::Adapters::AmazonSqs::Connection
Constants
- GET_QUEUE_ATTRIBUTES
- NUMBER_OF_MESSAGES
- QUEUE_NAME_LENGTH
- SET_QUEUE_ATTRIBUTES
- VISIBILITY_TIMEOUT
Attributes
configurable params
configurable params
configurable params
configurable params
configurable params
configurable params
configurable params
configurable params
configurable params
configurable params
Public Class Methods
generic init method needed by a13g
# File lib/activemessaging/adapters/asqs.rb, line 30 def initialize cfg raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?) raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?) @access_key_id=cfg[:access_key_id] @secret_access_key=cfg[:secret_access_key] @request_expires = cfg[:requestExpires] || 10 @request_retry_count = cfg[:requestRetryCount] || 5 @aws_version = cfg[:aws_version] || '2008-01-01' @content_type = cfg[:content_type] || 'text/plain' @host = cfg[:host] || 'queue.amazonaws.com' @port = cfg[:port] || 80 @protocol = cfg[:protocol] || 'http' @poll_interval = cfg[:poll_interval] || 1 @reconnect_delay = cfg[:reconnectDelay] || 5 @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8 @aws_url="#{@protocol}://#{@host}" @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list] @reliable = cfg[:reliable].nil? ? true : cfg[:reliable] #initialize the subscriptions and queues @subscriptions = {} @queues_by_priority = {} @current_subscription = 0 queues end
Public Instance Methods
# File lib/activemessaging/adapters/asqs.rb, line 60 def disconnect #it's an http request - there is no disconnect - ha! return true end
new receive respects priorities
# File lib/activemessaging/adapters/asqs.rb, line 115 def receive(options={}) message = nil only_priorities = options[:priorities] # loop through the priorities @queues_by_priority.keys.sort.each do |priority| # skip this priority if there is a list, and it is not in the list next if only_priorities && !only_priorities.include?(priority.to_i) # puts " - priority: #{priority}" # loop through queues for the priority in random order each time @queues_by_priority[priority].shuffle.each do |queue_name| # puts " - queue_name: #{queue_name}" queue = queues[queue_name] subscription = @subscriptions[queue_name] next if queue.nil? || subscription.nil? messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout]) if (messages && !messages.empty?) message = messages[0] end break if message end break if message end # puts " - message: #{message}" message end
# receive a single message from any of the subscribed queues # check each queue once, then sleep for #poll_interval def receive
raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?) start = @current_subscription while true # puts "calling receive..." @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0) sleep poll_interval if (@current_subscription == start) queue_name = @subscriptions.keys.sort[@current_subscription] queue = queues[queue_name] subscription = @subscriptions[queue_name] unless queue.nil? messages = retrieve_messsages queue, 1, subscription.headers[:visibility_timeout] return messages[0] unless (messages.nil? or messages.empty? or messages[0].nil?) end end
end
# File lib/activemessaging/adapters/asqs.rb, line 169 def received message, headers={} begin delete_message message rescue Object=>exception logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: " logger.error exception end end
queue_name string, body string, headers hash send a single message to a queue
# File lib/activemessaging/adapters/asqs.rb, line 95 def send queue_name, message_body, message_headers={} queue = get_or_create_queue queue_name send_messsage queue, message_body end
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues
# File lib/activemessaging/adapters/asqs.rb, line 67 def subscribe queue_name, message_headers={} # look at the existing queues, create any that are missing queue = get_or_create_queue queue_name if @subscriptions.has_key? queue.name @subscriptions[queue.name].add else @subscriptions[queue.name] = Subscription.new(queue.name, message_headers) end priority = @subscriptions[queue.name].priority @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority) @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name) end
do nothing; by not deleting the message will eventually become visible again
# File lib/activemessaging/adapters/asqs.rb, line 179 def unreceive message, headers={} return true end
queue_name string, headers hash for sqs, attempt delete the queues, won't work if not empty, that's ok
# File lib/activemessaging/adapters/asqs.rb, line 83 def unsubscribe queue_name, message_headers={} if @subscriptions[queue_name] @subscriptions[queue_name].remove if @subscriptions[queue_name].count <= 0 sub = @subscriptions.delete(queue_name) @queues_by_priority[sub.priority].delete(queue_name) end end end
Protected Instance Methods
# File lib/activemessaging/adapters/asqs.rb, line 310 def check_errors(response) raise "http response was nil" if (response.nil?) raise response.errors if (response && response.errors?) response end
# File lib/activemessaging/adapters/asqs.rb, line 185 def create_queue(name) validate_new_queue name response = make_request('CreateQueue', nil, {'QueueName'=>name}) add_queue(response.get_text("//QueueUrl")) unless response.nil? end
# File lib/activemessaging/adapters/asqs.rb, line 251 def delete_message message response = make_request('DeleteMessage', "#{message.queue.queue_url}", {'ReceiptHandle'=>message.receipt_handle}) end
# File lib/activemessaging/adapters/asqs.rb, line 191 def delete_queue queue validate_queue queue response = make_request('DeleteQueue', "#{queue.queue_url}") end
# File lib/activemessaging/adapters/asqs.rb, line 203 def get_queue_attributes(queue, attribute='All') validate_get_queue_attribute(attribute) params = {'AttributeName'=>attribute} response = make_request('GetQueueAttributes', "#{queue.queue_url}") attributes = {} response.each_node('/GetQueueAttributesResponse/GetQueueAttributesResult/Attribute') { |n| n = n.elements['Name'].text v = n.elements['Value'].text attributes[n] = v } if attribute != 'All' attributes[attribute] else attributes end end
I wrap this so I can move to a different client, or easily mock for testing
# File lib/activemessaging/adapters/asqs.rb, line 298 def http_request h, p, r return Net::HTTP.start(h, p){ |http| http.request(r) } end
# File lib/activemessaging/adapters/asqs.rb, line 196 def list_queues(queue_name_prefix=nil) validate_queue_name queue_name_prefix unless queue_name_prefix.nil? params = queue_name_prefix.nil? ? {} : {"QueueNamePrefix"=>queue_name_prefix} response = make_request('ListQueues', nil, params) response.nil? ? [] : response.nodes("//QueueUrl").collect{ |n| add_queue(n.text) } end
# File lib/activemessaging/adapters/asqs.rb, line 255 def make_request(action, url=nil, params = {}) # puts "make_request a=#{action} u=#{url} p=#{params}" url ||= @aws_url # Add Actions params['Action'] = action params['Version'] = @aws_version params['AWSAccessKeyId'] = @access_key_id params['Expires']= (Time.now + @request_expires).gmtime.iso8601 params['SignatureVersion'] = '1' # Sign the string sorted_params = params.sort_by { |key,value| key.downcase } string_to_sign = sorted_params.collect { |key, value| key.to_s + value.to_s }.join() digest = OpenSSL::Digest::Digest.new('sha1') hmac = OpenSSL::HMAC.digest(digest, @secret_access_key, string_to_sign) params['Signature'] = Base64.encode64(hmac).chomp # Construct request query_params = params.collect { |key, value| key + "=" + CGI.escape(value.to_s) }.join("&") # Put these together to get the request query string request_url = "#{url}?#{query_params}" # puts "request_url = #{request_url}" request = Net::HTTP::Get.new(request_url) retry_count = 0 while retry_count < @request_retry_count.to_i retry_count = retry_count + 1 # puts "make_request try retry_count=#{retry_count}" begin response = SQSResponse.new(http_request(host,port,request)) check_errors(response) return response rescue Object=>ex # puts "make_request caught #{ex}" raise ex unless reliable sleep(@reconnect_delay) end end end
# File lib/activemessaging/adapters/asqs.rb, line 239 def retrieve_messsages queue, num_messages=1, timeout=nil validate_queue queue validate_number_of_messages num_messages validate_timeout timeout if timeout params = {'MaxNumberOfMessages'=>num_messages.to_s} params['VisibilityTimeout'] = timeout.to_s if timeout response = make_request('ReceiveMessage', "#{queue.queue_url}", params) response.nodes("//Message").collect{ |n| Message.from_element n, response, queue } unless response.nil? end
in progress
# File lib/activemessaging/adapters/asqs.rb, line 232 def send_messsage queue, message validate_queue queue validate_message message response = make_request('SendMessage', queue.queue_url, {'MessageBody'=>message}) response.get_text("//MessageId") unless response.nil? end
# File lib/activemessaging/adapters/asqs.rb, line 220 def set_queue_attribute(queue, attribute, value) validate_set_queue_attribute(attribute) params = {'Attribute.Name'=>attribute, 'Attribute.Value'=>value.to_s} response = make_request('SetQueueAttributes', "#{queue.queue_url}", params) end
Private Instance Methods
internal data structure methods
# File lib/activemessaging/adapters/asqs.rb, line 319 def add_queue(url) q = Queue.from_url url queues[q.name] = q if self.cache_queue_list return q end
# File lib/activemessaging/adapters/asqs.rb, line 325 def get_or_create_queue queue_name qs = queues q = qs.has_key?(queue_name) ? qs[queue_name] : create_queue(queue_name) raise "could not get or create queue: #{queue_name}" unless q q end
# File lib/activemessaging/adapters/asqs.rb, line 359 def message_size_range @_message_size_range ||= 1..(max_message_size * 1024) end
# File lib/activemessaging/adapters/asqs.rb, line 332 def queues return @queues if (@queues && cache_queue_list) @queues = {} list_queues.each{|q| @queues[q.name]=q } return @queues end
# File lib/activemessaging/adapters/asqs.rb, line 367 def validate_get_queue_attribute qa raise "Queue Attribute name, #{qa}, not in list of valid attributes to get: #{GET_QUEUE_ATTRIBUTES.to_sentence}." unless GET_QUEUE_ATTRIBUTES.include?(qa) end
# File lib/activemessaging/adapters/asqs.rb, line 354 def validate_message m raise "Message cannot be nil." if m.nil? raise "Message length, #{m.length}, must be between #{message_size_range.min} and #{message_size_range.max}." unless message_size_range.include?(m.length) end
# File lib/activemessaging/adapters/asqs.rb, line 345 def validate_new_queue qn validate_queue_name qn raise "Queue already exists: #{qn}" if queues.has_key? qn end
# File lib/activemessaging/adapters/asqs.rb, line 375 def validate_number_of_messages nom raise "Number of messages, #{nom}, must be between #{NUMBER_OF_MESSAGES.min} and #{NUMBER_OF_MESSAGES.max}." unless NUMBER_OF_MESSAGES.include?(nom) end
# File lib/activemessaging/adapters/asqs.rb, line 350 def validate_queue q raise "Never heard of queue, can't use it: #{q.name}" unless queues.has_key? q.name end
validation methods
# File lib/activemessaging/adapters/asqs.rb, line 340 def validate_queue_name qn raise "Queue name, '#{qn}', must be between #{QUEUE_NAME_LENGTH.min} and #{QUEUE_NAME_LENGTH.max} characters." unless QUEUE_NAME_LENGTH.include?(qn.length) raise "Queue name, '#{qn}', must be alphanumeric only." if (qn =~ /[^\w\-\_]/ ) end
# File lib/activemessaging/adapters/asqs.rb, line 371 def validate_set_queue_attribute qa raise "Queue Attribute name, #{qa}, not in list of valid attributes to set: #{SET_QUEUE_ATTRIBUTES.to_sentence}." unless SET_QUEUE_ATTRIBUTES.include?(qa) end
# File lib/activemessaging/adapters/asqs.rb, line 363 def validate_timeout to raise "Timeout, #{to}, must be between #{VISIBILITY_TIMEOUT.min} and #{VISIBILITY_TIMEOUT.max}." unless VISIBILITY_TIMEOUT.include?(to) end