Tag: SQS


Using Python and Amazon SQS FIFO Queues to Preserve Message Sequencing

by Tara Van Unen | on | in Python | Permalink | Comments |  Share

Thanks to Alexandre Pinhel, Solutions Architect from our team for writing this post!

Amazon SQS is a managed message queuing service that makes it simple to decouple application components. We recently announced an entirely new queue type, SQS FIFO (first-in, first out) queues with exactly-once processing and deduplication. SQS FIFO queues are now available in the US East (Ohio) and US West (Oregon) regions, with more regions to follow. This new type of queue lets you use Amazon SQS for systems that depend on receiving messages in exact order, and exactly once, such as financial services and e-commerce applications. For example, FIFO queues help ensure mobile banking transactions are processed in the correct sequence, and that inventory updates for online retail sites are processed in the right order. In this post, we show how to use FIFO queues to preserve message sequencing with Python.

FIFO queues complement our existing SQS standard queues, which offer higher throughput, best-effort ordering, and at-least-once delivery. The following diagram compares the features of standard queues vs. FIFO queues. The same API functions apply to both types of queues.

The below use case provides an example of how you can now use SQS FIFO queues to exchange sequence-sensitive information. For more information about developing applications using Amazon SQS, see the Amazon SQS Developer Guide.

SQS FIFO Queues Example

In the capital markets industry, some of the most common patterns for exchanging messages with partners and customers are based on messaging technologies with two types of scenarios:

  1. Communication channels between two messaging managers (one sender channel and one receiver channel). Each messaging manager hosts the local queue and has an alias to the remote queue hosted on the other side (an MQ manager). The messages sent from an MQ manager are not stored locally. The receiving MQ manager stores the messages for the client applications of the named queues.
  2. A single messaging manager that hosts all the queues and that has the associated responsibility for message exchange and backup.

You can use Amazon SQS to decouple the components of an application so that these components can run independently, as expected in a messaging use case. The following diagram shows a sample architecture using an SQS queue with processing servers.


To preserve the order of messages, we use FIFO queues. These queues help ensure that trades are received in the correct order, and a book event is received before an update event or a cancel event.

Important: The name of a FIFO queue must end with the .fifo suffix.

The following diagram shows a financial use case, where Amazon SQS FIFO queues are used with different processing servers based on the type of messages being managed.

 

 

In FIFO queues, Amazon SQS also provides content-based deduplication. Content-based deduplication allows SQS to distinguish the contents of one message from the contents of another message using the message body. This helps eliminate duplicates in referential systems such as those that manage pricing.

In the following example, we simulate the two parts of a capital market exchange. In the first part, we simulate the application sending the trade status and sending messages to the queue named Trade Status. (In Amazon SQS, the queue will be named TradeStatus.fifo.) The application regularly sends trade status received during the trade lifecycle in the queue (for example, trade received, trade checked, trade confirmed, and so on). In the second part, we simulate a client application that gets the trade status to update an internal website or to send status update notifications to other tools. The script stops after the message is read.

To accomplish this, you can use the following two Python code examples. This example is using boto3, the AWS SDK for Python.

This first script sends an XML message to a queue named TradeStatus.fifo, and the second script receives the message from the same queue. Messages can contain up to 256 KB of text in any format. Any component can later retrieve the messages programmatically using the Amazon SQS API. You can manage messages larger than 256 KB by using the SQS Extended Client Library for Java, which uses Amazon S3 to store larger payloads.

For queue creation, please see the Amazon SQS Developer guide.

Name: TradeStatus.fifo

URL: https://sqs.us-west-2.amazonaws.com/12345678/TradeStatus.fifo

The scripts below are in Python2.

import boto3

# Get the service resource
sqs = boto3.resource('sqs')

# Get the queue
queue = sqs.get_queue_by_name(QueueName='TradeStatus.fifo')

try:
    userInput = raw_input("Please enter file name: ")
except NameError:
    pass

with open(userInput, 'r') as myfile:
    data=myfile.read()

response = queue.send_message(
    MessageBody=data,
    MessageGroupId='messageGroup1'
)

# The response is NOT a resource, but gives you a message ID and MD5
print(response.get('MessageId'))
print(response.get('MD5OfMessageBody'))

The following Python code receives the message from the TradeStatus.fifo queue and deletes the message when it’s received. Afterward, the message is no longer available.

import boto3

# Get the service resource
sqs = boto3.resource('sqs')

# Get the queue
queue = sqs.get_queue_by_name(QueueName='TradeStatus.fifo')

# Process messages by printing out body
for message in queue.receive_messages():
    # Print out the body of the message
    print('Hello, {0}'.format(message.body))

    # Let the queue know that the message is processed
    message.delete()

Note: In Python, you need only the name of the queue.

More Resources

In this post, we showed how you can use Amazon SQS FIFO queues to exchange data between distributed systems that depend on receiving messages in exact order, and exactly once. You can get started with SQS FIFO queues using just three simple commands. For more information, see the following resources:

Polling Messages from a Amazon SQS Queue

by Trevor Rowe | on | in Ruby | Permalink | Comments |  Share

We’ve recently added a utility class to the AWS SDK for Ruby that makes it easy to poll an Amazon SQS queue for messages.

poller = Aws::SQS::QueuePoller.new(queue_url)

poller.poll do |msg|
  puts msg.body
end

Messages are automatically deleted from the queue at the end of the block. This tool supports receiving and deleting messages in batches, long-polling, client-side tracking of stats, and more.

Long Polling

By default, messages are received using long polling. This method will force a default :wait_time_seconds of 20 seconds. If you prefer to use the queue default wait time, then pass a nil value for :wait_time_seconds.

# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds attribute
poller.poll(wait_time_seconds:nil) do |msg|
  # ...
end

When disabling :wait_time_seconds by passing nil, you must ensure the queue ReceiveMessageWaitTimeSeconds attribute is set to a non zero value, or you will be short polling. This will trigger significantly more API calls.

Batch Receiving Messages

You can specify a maximum number of messages to receive with each polling attempt via :max_number_of_messages. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.

# receives and yields up to 10 messages at a time
poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

The maximum value for :max_number_of_messages is enforced by Amazon SQS.

Visibility Timeouts

When receiving messages, you have a fixed amount of time to process and delete each message before it is added back into the queue. This is the visibility timeout. By default, the queue’s VisibilityTimeout attribute is used. You can provide an alternative visibility timeout when polling.

# override queue visibility timeout
poller.poll(visibility_timeout:10) do |msg|
  # do work ...
end

You can reset the visibility timeout of a single message by calling #change_message_visibility. This is useful when you need more time to finish processing the message.

poller.poll do |msg|

  # do work ...

  # need more time for processing
  poller.change_message_visibility(msg, 60)

  # finish work ...

end

If you change the visibility timeout of a message to zero, it will return to the queue immediately.

Deleting Messages

Messages are deleted from the queue when the block returns normally.

poller.poll do |msg|
  # do work
end # messages deleted here

You can skip message deletion by passing skip_delete: true. This allows you to manually delete the messages using {#deletemessage}, or {#deletemessages}.

# single message
poller.poll(skip_delete: true) do |msg|
  poller.delete_message(msg) # if successful
end

# message batch
poller.poll(skip_delete: true, max_number_of_messages:10) do |messages|
  poller.delete_messages(messages)
end

Another way to manage message deletion is to throw :skip_delete from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis:

poller.poll do |msg|
  begin
    # do work
  rescue
    # unexpected error occurred while processing messages,
    # log it, and skip delete so it can be re-processed later
    throw :skip_delete
  end
end

Terminating the Polling Loop

By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing :stop_polling from the {#before_request} callback.

:idle_timeout

This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.

# stops polling after a minute of no received messages
poller.poll(idle_timeout: 60) do |msg|
  # ...
end

:stop_polling

If you want more fine-grained control, you can configure a before request callback to trigger before each long poll. Throwing :stop_polling from this callback will cause the poller to exit normally without making the next request.

# stop after processing 100 messages
poller.before_request do |stats|
  throw :stop_polling if stats.receive_message_count >= 100
end

poller.poll do |msg|
  # do work ...
end

Tracking Progress

The poller will automatically track a few statistics client-side in a PollerStats object. You can access the poller stats three ways:

  • The first block argument of {#before_request}
  • The second block argument of {#poll}.
  • The return value from {#poll}.

Here are examples of accessing the statistics.

  • Configure a {#before_request} callback.

    poller.before_reqeust do |stats|
      logger.info("requests: #{stats.request_count}")
      logger.info("messages: #{stats.received_message_count}")
      logger.info("last-timestamp: #{stats.last_message_received_at}")
    end
  • Accept a second argument in the poll block, for example:

    poller.poll do |msg, stats|
      logger.info("requests: #{stats.request_count}")
      logger.info("messages: #{stats.received_message_count}")
      logger.info("last-timestamp: #{stats.last_message_received_at}")
    end
  • Return value:

    stats = poller.poll(idle_timeout:10) do |msg|
      # do work ...
    end
    logger.info("requests: #{stats.request_count}")
    logger.info("messages: #{stats.received_message_count}")
    logger.info("last-timestamp: #{stats.last_message_received_at}")

Feedback

Let us know what you think about the new queue poller. Join the conversation in our Gitter channel or open a GitHub issue.

Using Amazon SQS Dead Letter Queues

by Norm Johanson | on | in .NET | Permalink | Comments |  Share

After Jason Fulghum recently posted a blog entry about using Amazon SQS dead letter queues with the AWS SDK for Java, I thought his post would be interesting for .NET developers as well. Here is Jason’s post with the code replaced with the C# equivalent.

Amazon SQS recently introduced support for dead letter queues. This feature is an important tool to help your applications consume messages from SQS queues in a more resilient way.

Dead letter queues allow you to set a limit on the number of times a message in a queue is processed. Consider an application that consumes messages from a queue and does some sort of processing based on the message. A bug in your application may only be triggered by certain types of messages or when working with certain data in your application. If your application receives one of these messages, it won’t be able to successfully process it and remove it from the queue. Instead, your application will continue to try to process the message again and again. While this message is being continually retried, your queue is likely filling up with other messages, which your application is unable to process because it’s stuck repeatedly processing the bad message.

Amazon SQS dead letter queues enable you to configure your application so that if it can’t successfully process a problematic message and remove it from the queue, that message will be automatically removed from your queue and delivered to a different SQS queue that you’ve designated as a dead letter queue. Another part of your application can then periodically monitor the dead letter queue and alert you if it contains any messages, which you can debug separately.

Using Amazon SQS dead letter queues is easy. You just need to configure a RedrivePolicy on your queue to specify when messages are delivered to a dead letter queue and to which dead letter queue they should be delivered. You can use the AWS Management Console, or you can access the Amazon SQS API directly with the AWS SDK for .NET.

// First, we'll need an Amazon SQS client object.
IAmazonSQS sqs = new AmazonSQSClient(RegionEndpoint.USWest2);

// Create two new queues:
//     one main queue for our application messages
//     and another to use as our dead letter queue
string qUrl = sqs.CreateQueue(new CreateQueueRequest()
{
    QueueName = "MyApplicationQueue"
}).QueueUrl;

string dlqUrl = sqs.CreateQueue(new CreateQueueRequest()
{
    QueueName = "MyDeadLetterQueue"
}).QueueUrl;

// Next, we need to get the the ARN (Amazon Resource Name) of our dead
// letter queue so we can configure our main queue to deliver messages to it.
IDictionary attributes = sqs.GetQueueAttributes(new GetQueueAttributesRequest()
{
    QueueUrl = dlqUrl,
    AttributeNames = new List() { "QueueArn" }
}).Attributes;

string dlqArn = attributes["QueueArn"];

// The last step is setting a RedrivePolicy on our main queue to configure
// it to deliver messages to our dead letter queue if they haven't been
// successfully processed after five attempts.
string redrivePolicy = string.Format(
    "{{"maxReceiveCount":"{0}", "deadLetterTargetArn":"{1}"}}",
    5, dlqArn);

sqs.SetQueueAttributes(new SetQueueAttributesRequest()
{
    QueueUrl = qUrl,
    Attributes = new Dictionary()
    {
        {"RedrivePolicy", redrivePolicy}
    }
});

There’s also a new operation in the Amazon SQS API to help you identify which of your queues are set up to deliver messages to a specific dead letter queue. If you want to know what queues are sending messages to a dead letter queue, just use the IAmazonSQS.ListDeadLetterSourceQueues operation.

IList sourceQueues = sqs.ListDeadLetterSourceQueues(
    new ListDeadLetterSourceQueuesRequest()
    {
        QueueUrl = dlqUrl
    }).QueueUrls;

Console.WriteLine("Source Queues Delivering to " + qUrl);
foreach (string queueUrl in sourceQueues)
{
    Console.WriteLine(" * " + queueUrl);
}

Dead letter queues are a great way to add more resiliency to your queue-based applications. Have you set up any dead letter queues in Amazon SQS yet?

Subscribing an SQS Queue to an SNS Topic

by Norm Johanson | on | in .NET | Permalink | Comments |  Share

In version 2.0.2.3 of the SDK we added an enhancement to the SDK to make it easier to subscribe an Amazon SQS queue to an Amazon SNS topic. You have always been able to subscribe queues to topics using the Subscribe method on the SNS client, but after you subscribed to the topic with your queue, you also had to set a policy on the queue using the SetQueueAttributes method from the SQS client. The policy gives permission to the topic to send a message to the queue.

With this new feature, you can call SubscribeQueue from the SNS client, and it will take care of both the subscription and setting up the policy. This code snippet shows how to create a queue and topic, subscribe the queue, and then send a message.

string queueURL = sqsClient.CreateQueue(new CreateQueueRequest
{
    QueueName = "theQueue"
}).QueueUrl;


string topicArn = snsClient.CreateTopic(new CreateTopicRequest
{
    Name = "theTopic"
}).TopicArn;

snsClient.SubscribeQueue(topicArn, sqsClient, queueURL);

// Sleep to wait for the subscribe to complete.
Thread.Sleep(TimeSpan.FromSeconds(5));

// Publish the message to the topic
snsClient.Publish(new PublishRequest
{
    TopicArn = topicArn,
    Message = "Test Message"
});

// Get the message from the queue.
var messages = sqsClient.ReceiveMessage(new ReceiveMessageRequest
{
    QueueUrl = queueURL,
    WaitTimeSeconds = 20
}).Messages;

AWS at Symfony Live Portland 2013

by Jeremy Lindblom | on | in PHP | Permalink | Comments |  Share

A few weeks ago, I had the pleasure of attending the Symfony Live Portland 2013 conference. This year, Symfony Live co-located with the very large DrupalCon, and though I did not attend any of the DrupalCon sessions, I did get to talk to many Drupal developers during lunches and the hack day. It was awesome to be among so many other PHP developers.

I had the honor of being selected as a speaker at Symfony Live, and the topic of my session was Getting Good with the AWS SDK for PHP (here are the slides and Joind.in event). In this talk I did a brief introduction about AWS and its services, taught how to use the AWS SDK for PHP, and demonstrated some code from a sample PHP application that uses Amazon S3 and Amazon DynamoDB to manage its data.

How does the SDK integrate with Symfony?

Since I was in the presence of Symfony developers, I made sure to point out some of the ways that the AWS SDK for PHP currently integrates with the Symfony framework and community.

The SDK uses the Symfony Event Dispatcher

The SDK uses the Symfony Event Dispatcher component quite heavily. Not only are many of the internal details of the SDK implemented with events (e.g., request signing), but users of the SDK can listen for events and inject their own logic into the request flow.

For example, the following code attaches an event listener to an SQS client that will capitalize messages sent to a queue via the SendMessage operation.

use AwsCommonAws;
use GuzzleCommonEvent; // Extends SymfonyComponentEventDispatcherEvent

$aws = Aws::factory('/path/to/your/config.php');
$sqs = $aws->get('sqs');

$dispatcher = $sqs->getEventDispatcher();
$dispatcher->addListener('command.before_send', function (Event $event) {
    $command = $event['command'];
    if ($command->getName() === 'SendMessage') {
        // Ensure the message is capitalized
        $command['MessageBody'] = ucfirst($command['MessageBody']);
    }
});

$sqs->sendMessage(array(
    'QueueUrl'    => $queueUrl,
    'MessageBody' => 'an awesome message.',
));

We publish an AWS Service Provider for Silex

For Silex users, we publish an AWS Service Provider for Silex that makes it easier to bootstrap the AWS SDK for PHP within a Silex application. I used this service provider in my presentation with the sample PHP application, so make sure to check out my slides.

You can use the Symfony Finder with Amazon S3

In my presentation, I also pointed out our recent addition of the S3 Stream Wrapper to our SDK and how you can use it in tandem with the Symfony Finder component to find files within your Amazon S3 buckets.

The following example shows how you can use the Symfony Finder to find S3 objects in the bucket "jcl-files", with a key prefix of "family-videos", that are smaller than 50 MB in size and no more than a year old.

use AwsCommonAws;
use SymfonyComponentFinderFinder;

$aws = Aws::factory('/path/to/your/config.php');
$aws->get('s3')->registerStreamWrapper();

$finder = new Finder();
$finder->files()
    ->in('s3://jcl-files/family-videos')
    ->size('< 50M')
    ->date('since 1 year ago');

foreach ($finder as $file) {
    echo $file->getFilename() . PHP_EOL;
}

Others talked about AWS

One of my co-workers, Michael Dowling, also presented at the conference. His presentation was about his open source project, Guzzle, which is a powerful HTTP client library and is used as the foundation of the AWS SDK for PHP. In his talk, Michael also highlighted a few of the ways that the AWS SDK for PHP uses Guzzle. Guzzle is also being used in the core of Drupal 8, so his presentation drew in a crowd of both Drupal and Symfony developers.

Aside from our presentations, there were various sessions focused on the Symfony framework as well as others on various topics like Composer, caching, and cryptograpy. David Zuelke and Juozas Kaziukėnas both mentioned how they use AWS services in their talks: Surviving a Prime Time TV Commercial and Process any amount of data. Any time, respectively. It was nice to meet in person many PHP developers I’ve talked with online and to participate in Symfony Live traditions such as PHP Jeopardy and karaoke.

While at the conference, I talked to several developers about what would make a good AWS Symfony bundle or Drupal module, but I’m also curious to find out what you think. So… what would you like to see in an AWS Symfony Bundle? What would make a good AWS Drupal module? Let us know your thoughts in the comments.