AWS Developer Blog

Polling Messages from a Amazon SQS Queue

by Trevor Rowe | on | in Ruby | Permalink | Comments |  Share

We’ve recently added a utility class to the AWS SDK for Ruby that makes it easy to poll an Amazon SQS queue for messages.

poller = Aws::SQS::QueuePoller.new(queue_url)

poller.poll do |msg|
  puts msg.body
end

Messages are automatically deleted from the queue at the end of the block. This tool supports receiving and deleting messages in batches, long-polling, client-side tracking of stats, and more.

Long Polling

By default, messages are received using long polling. This method will force a default :wait_time_seconds of 20 seconds. If you prefer to use the queue default wait time, then pass a nil value for :wait_time_seconds.

# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds attribute
poller.poll(wait_time_seconds:nil) do |msg|
  # ...
end

When disabling :wait_time_seconds by passing nil, you must ensure the queue ReceiveMessageWaitTimeSeconds attribute is set to a non zero value, or you will be short polling. This will trigger significantly more API calls.

Batch Receiving Messages

You can specify a maximum number of messages to receive with each polling attempt via :max_number_of_messages. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.

# receives and yields up to 10 messages at a time
poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

The maximum value for :max_number_of_messages is enforced by Amazon SQS.

Visibility Timeouts

When receiving messages, you have a fixed amount of time to process and delete each message before it is added back into the queue. This is the visibility timeout. By default, the queue’s VisibilityTimeout attribute is used. You can provide an alternative visibility timeout when polling.

# override queue visibility timeout
poller.poll(visibility_timeout:10) do |msg|
  # do work ...
end

You can reset the visibility timeout of a single message by calling #change_message_visibility. This is useful when you need more time to finish processing the message.

poller.poll do |msg|

  # do work ...

  # need more time for processing
  poller.change_message_visibility(msg, 60)

  # finish work ...

end

If you change the visibility timeout of a message to zero, it will return to the queue immediately.

Deleting Messages

Messages are deleted from the queue when the block returns normally.

poller.poll do |msg|
  # do work
end # messages deleted here

You can skip message deletion by passing skip_delete: true. This allows you to manually delete the messages using {#deletemessage}, or {#deletemessages}.

# single message
poller.poll(skip_delete: true) do |msg|
  poller.delete_message(msg) # if successful
end

# message batch
poller.poll(skip_delete: true, max_number_of_messages:10) do |messages|
  poller.delete_messages(messages)
end

Another way to manage message deletion is to throw :skip_delete from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis:

poller.poll do |msg|
  begin
    # do work
  rescue
    # unexpected error occurred while processing messages,
    # log it, and skip delete so it can be re-processed later
    throw :skip_delete
  end
end

Terminating the Polling Loop

By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing :stop_polling from the {#before_request} callback.

:idle_timeout

This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.

# stops polling after a minute of no received messages
poller.poll(idle_timeout: 60) do |msg|
  # ...
end

:stop_polling

If you want more fine-grained control, you can configure a before request callback to trigger before each long poll. Throwing :stop_polling from this callback will cause the poller to exit normally without making the next request.

# stop after processing 100 messages
poller.before_request do |stats|
  throw :stop_polling if stats.receive_message_count >= 100
end

poller.poll do |msg|
  # do work ...
end

Tracking Progress

The poller will automatically track a few statistics client-side in a PollerStats object. You can access the poller stats three ways:

  • The first block argument of {#before_request}
  • The second block argument of {#poll}.
  • The return value from {#poll}.

Here are examples of accessing the statistics.

  • Configure a {#before_request} callback.

    poller.before_reqeust do |stats|
      logger.info("requests: #{stats.request_count}")
      logger.info("messages: #{stats.received_message_count}")
      logger.info("last-timestamp: #{stats.last_message_received_at}")
    end
  • Accept a second argument in the poll block, for example:

    poller.poll do |msg, stats|
      logger.info("requests: #{stats.request_count}")
      logger.info("messages: #{stats.received_message_count}")
      logger.info("last-timestamp: #{stats.last_message_received_at}")
    end
  • Return value:

    stats = poller.poll(idle_timeout:10) do |msg|
      # do work ...
    end
    logger.info("requests: #{stats.request_count}")
    logger.info("messages: #{stats.received_message_count}")
    logger.info("last-timestamp: #{stats.last_message_received_at}")

Feedback

Let us know what you think about the new queue poller. Join the conversation in our Gitter channel or open a GitHub issue.