AWS Developer Tools Blog
Polling Messages from a Amazon SQS Queue
We’ve recently added a utility class to the AWS SDK for Ruby that makes it easy to poll an Amazon SQS queue for messages.
poller = Aws::SQS::QueuePoller.new(queue_url) poller.poll do |msg| puts msg.body end
Messages are automatically deleted from the queue at the end of the block. This tool supports receiving and deleting messages in batches, long-polling, client-side tracking of stats, and more.
Long Polling
By default, messages are received using long polling. This method will force a default :wait_time_seconds
of 20 seconds. If you prefer to use the queue default wait time, then pass a nil
value for :wait_time_seconds
.
# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds attribute poller.poll(wait_time_seconds:nil) do |msg| # ... end
When disabling :wait_time_seconds
by passing nil
, you must ensure the queue ReceiveMessageWaitTimeSeconds
attribute is set to a non zero value, or you will be short polling. This will trigger significantly more API calls.
Batch Receiving Messages
You can specify a maximum number of messages to receive with each polling attempt via :max_number_of_messages
. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.
# receives and yields up to 10 messages at a time poller.poll(max_number_of_messages:10) do |messages| messages.each do |msg| # ... end end
The maximum value for :max_number_of_messages
is enforced by Amazon SQS.
Visibility Timeouts
When receiving messages, you have a fixed amount of time to process and delete each message before it is added back into the queue. This is the visibility timeout. By default, the queue’s VisibilityTimeout
attribute is used. You can provide an alternative visibility timeout when polling.
# override queue visibility timeout poller.poll(visibility_timeout:10) do |msg| # do work ... end
You can reset the visibility timeout of a single message by calling #change_message_visibility
. This is useful when you need more time to finish processing the message.
poller.poll do |msg| # do work ... # need more time for processing poller.change_message_visibility(msg, 60) # finish work ... end
If you change the visibility timeout of a message to zero, it will return to the queue immediately.
Deleting Messages
Messages are deleted from the queue when the block returns normally.
poller.poll do |msg| # do work end # messages deleted here
You can skip message deletion by passing skip_delete: true
. This allows you to manually delete the messages using {#deletemessage}, or {#deletemessages}.
# single message poller.poll(skip_delete: true) do |msg| poller.delete_message(msg) # if successful end # message batch poller.poll(skip_delete: true, max_number_of_messages:10) do |messages| poller.delete_messages(messages) end
Another way to manage message deletion is to throw :skip_delete
from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis:
poller.poll do |msg| begin # do work rescue # unexpected error occurred while processing messages, # log it, and skip delete so it can be re-processed later throw :skip_delete end end
Terminating the Polling Loop
By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing :stop_polling
from the {#before_request} callback.
:idle_timeout
This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.
# stops polling after a minute of no received messages poller.poll(idle_timeout: 60) do |msg| # ... end
:stop_polling
If you want more fine-grained control, you can configure a before request callback to trigger before each long poll. Throwing :stop_polling
from this callback will cause the poller to exit normally without making the next request.
# stop after processing 100 messages poller.before_request do |stats| throw :stop_polling if stats.receive_message_count >= 100 end poller.poll do |msg| # do work ... end
Tracking Progress
The poller will automatically track a few statistics client-side in a PollerStats object. You can access the poller stats three ways:
- The first block argument of {#before_request}
- The second block argument of {#poll}.
- The return value from {#poll}.
Here are examples of accessing the statistics.
-
Configure a {#before_request} callback.
poller.before_reqeust do |stats| logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.received_message_count}") logger.info("last-timestamp: #{stats.last_message_received_at}") end
-
Accept a second argument in the poll block, for example:
poller.poll do |msg, stats| logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.received_message_count}") logger.info("last-timestamp: #{stats.last_message_received_at}") end
-
Return value:
stats = poller.poll(idle_timeout:10) do |msg| # do work ... end logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.received_message_count}") logger.info("last-timestamp: #{stats.last_message_received_at}")
Feedback
Let us know what you think about the new queue poller. Join the conversation in our Gitter channel or open a GitHub issue.