AWS Compute Blog
Building event-driven architectures with Amazon SNS FIFO
This post is courtesy of Christian Mueller, Principal Solutions Architect.
Developers increasingly adopt event-driven architectures to decouple their distributed applications. Often, these events must be propagated in a strictly ordered manner to all subscribed applications. Using Amazon SNS FIFO topics and Amazon SQS FIFO queues, you can address use cases that require end-to-end message ordering, deduplication, filtering, and encryption.
In this blog post, I introduce a sample event-driven architecture. I walk through an implementation based on Amazon SNS FIFO topics and Amazon SQS FIFO queues.
Common requirements in event-driven-architectures
In event-driven architectures, data consistency is a common business requirement. This is often translated into technical requirements such as zero message loss and strict message ordering. For example, if you update your domain object rapidly, you want to be sure that all events are received by each subscriber in exactly the order they occurred. This way, the current domain object state is what each subscriber received as the latest update event. Similarly, all update events should be received after the initial create event.
Before Amazon SNS FIFO, architects had to design applications to check if messages are received out of order before processing.
Another common challenge is preventing message duplicates when sending events to the messaging service. If an event publisher receives an error, such as a network timeout, the publisher does not know if the messaging service could receive and successfully process the message or not.
The client may retry, as this is the default behavior for some HTTP response codes in AWS SDKs. This can cause duplicate messages.
Before Amazon SNS FIFO, developers had to design receivers to be idempotent. In some cases, where the event cannot be idempotent, this requires the receiver to be implemented in an idempotent way. Often, this is done by adding a key-value store like Amazon DynamoDB or Amazon ElastiCache for Redis to the service. Using this approach, the receiver can track if the event has been seen before.
Exploring the recruiting agency example
This sample application models a recruitment agency with a job listings website. The application is composed of multiple services. I explain 3 of them in more detail.
A custom service, the anti-corruption service, receives a change data capture (CDC) event stream of changes from a relational database. This service translates the low-level technical database events into meaningful business events for the domain services for easy consumption. These business events are sent to the SNS FIFO “JobEvents.fifo“ topic. Here, interested services subscribe to these events and process them asynchronously.
In this domain, the analytics service is interested in all events. It has an SQS FIFO “AnalyticsJobEvents.fifo” queue subscribed to the SNS FIFO “JobEvents.fifo“ topic. It uses SQS FIFO as event source for AWS Lambda, which processes and stores these events in Amazon S3. S3 is object storage service with high scalability, data availability, durability, security, and performance. This allows you to use services like Amazon EMR, AWS Glue or Amazon Athena to get insights into your data to extract value.
The inventory service owns an SQS FIFO “InventoryJobEvents.fifo” queue, which is subscribed to the SNS FIFO “JobEvents.fifo“ topic. It is only interested in “JobCreated” and “JobDeleted” events, as it only tracks which jobs are currently available and stores this information in a DynamoDB table. Therefore, it uses an SNS filter policy to only receive these events, instead of receiving all events.
This sample application focuses on the SNS FIFO capabilities, so I do not explore other services subscribed to the SNS FIFO topic. This sample follows the SQS best practices and SNS redrive policy recommendations and configures dead-letter queues (DLQ). This is useful in case SNS cannot deliver an event to the subscribed SQS queue. It also helps if the function fails to process an event from the corresponding SQS FIFO queue multiple times. As a requirement in both cases, the attached SQS DLQ must be an SQS FIFO queue.
Deploying the application
To deploy the application using infrastructure as code, it uses the AWS Serverless Application Model (SAM). SAM provides shorthand syntax to express functions, APIs, databases, and event source mappings. It is expanded into AWS CloudFormation syntax during deployment.
To get started, clone the “event-driven-architecture-with-sns-fifo” repository, from here. Alternatively, download the repository as a ZIP file from here and extract it to a directory of your choice.
As a prerequisite, you must have SAM CLI, Python 3, and PIP installed. You must also have the AWS CLI configured properly.
Navigate to the root directory of this project and build the application with SAM. SAM downloads required dependencies and stores them locally. Execute the following commands in your terminal:
git clone https://github.com/aws-samples/event-driven-architecture-with-amazon-sns-fifo.git
cd event-driven-architecture-with-amazon-sns-fifo
sam build
You see the following output:
Now, deploy the application:
sam deploy --guided
Provide arguments for the deployments, such as the stack name and preferred AWS Region:
After a successful deployment, you see the following output:
Learning more about the implementation
I explore the three services forming this sample application, and how they use the features of SNS FIFO.
Anti-corruption service
The anti-corruption service owns the SNS FIFO “JobEvents.fifo” topic, where it publishes business events related to job postings. It uses an SNS FIFO topic, as end-to-end ordering per job ID is required. SNS FIFO is configured not to perform content-based deduplication, as I require a unique message deduplication ID for each event for deduplication. The corresponding definition in the SAM template looks like this:
JobEventsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: JobEvents.fifo
FifoTopic: true
ContentBasedDeduplication: false
For simplicity, the anti-corruption function in the sample application doesn’t consume an external database CDC stream. It uses Amazon CloudWatch Events as an event source to trigger the function every minute.
I provide the SNS FIFO topic Amazon Resource Name (ARN) as an environment variable in the function. This makes this function more portable to deploy in different environments and stages. The function’s AWS Identity and Access Management (IAM) policy grants permissions to publish messages to only this SNS topic:
AntiCorruptionFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: anti-corruption-service/
Handler: app.lambda_handler
Runtime: python3.7
MemorySize: 256
Environment:
Variables:
TOPIC_ARN: !Ref JobEventsTopic
Policies:
- SNSPublishMessagePolicy
TopicName: !GetAtt JobEventsTopic.TopicName
Events:
Trigger:
Type:
Properties:
Schedule: 'rate(1 minute)'
The anti-corruption function uses features in the SNS publish API, which allows you to define a “MessageDeduplicationId” and a “MessageGroupId”. The “MessageDeduplicationId” is used to filter out duplicate messages, which are sent to SNS FIFO within in 5-minute deduplication interval. The “MessageGroupId” is required, as SNS FIFO processes all job events for the same message group in a strictly ordered manner, isolated from other message groups processed through the same topic.
Another important aspect in this implementation is the use of “MessageAttributes”. We define a message attribute with the name “eventType” and values like “JobCreated”, “JobSalaryUpdated”, and “JobDeleted”. This allows subscribers to define SNS filter policies to only receive certain events they are interested in:
import boto3
from datetime import datetime
import json
import os
import random
import uuid
TOPIC_ARN = os.environ['TOPIC_ARN']
sns = boto3.client('sns')
def lambda_handler(event, context):
jobId = str(random.randrange(0, 1000))
send_job_created_event(jobId)
send_job_updated_event(jobId)
send_job_deleted_event(jobId)
return
def send_job_created_event(jobId):
messageId = str(uuid.uuid4())
response = sns.publish(
TopicArn=TOPIC_ARN,
Subject=f'Job {jobId} created',
MessageDeduplicationId=messageId,
MessageGroupId=f'JOB-{jobId}',
Message={...},
MessageAttributes = {
'eventType': {
'DataType': 'String',
'StringValue': 'JobCreated'
}
}
)
print('sent message and received response: {}'.format(response))
return
def send_job_updated_event(jobId):
messageId = str(uuid.uuid4())
response = sns.publish(...)
print('sent message and received response: {}'.format(response))
return
def send_job_deleted_event(jobId):
messageId = str(uuid.uuid4())
response = sns.publish(...)
print('sent message and received response: {}'.format(response))
return
Analytics service
The analytics service owns an SQS FIFO “AnalyticsJobEvents.fifo” queue which is subscribed to the SNS FIFO “JobEvents.fifo” topic. Following best practices, I define redrive policies for the SQS FIFO queue and the SNS FIFO subscription in the template:
AnalyticsJobEventsQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: AnalyticsJobEvents.fifo
FifoQueue: true
RedrivePolicy:
deadLetterTargetArn: !GetAtt AnalyticsJobEventsQueueDLQ.Arn
maxReceiveCount: 3
AnalyticsJobEventsQueueToJobEventsTopicSubscription:
Type: AWS::SNS::Subscription
Properties:
Endpoint: !GetAtt AnalyticsJobEventsQueue.Arn
Protocol: sqs
RawMessageDelivery: true
TopicArn: !Ref JobEventsTopic
RedrivePolicy: !Sub '{"deadLetterTargetArn": "${AnalyticsJobEventsSubscriptionDLQ.Arn}"}'
The analytics function uses SQS FIFO as an event source for Lambda. The S3 bucket name is an environment variable for the function, which increases the code portability across environments and stages. The IAM policy for this function only grants permissions write objects to this S3 bucket:
AnalyticsFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: analytics-service/
Handler: app.lambda_handler
Runtime: python3.7
MemorySize: 256
Environment:
Variables:
BUCKET_NAME: !Ref AnalyticsBucket
Policies:
- S3WritePolicy:
BucketName: !Ref AnalyticsBucket
Events:
Trigger:
Type: SQS
Properties:
Queue: !GetAtt AnalyticsJobEventsQueue.Arn
BatchSize: 10
View the function implementation at the GitHub repo.
Inventory service
The inventory service also owns an SQS FIFO “InventoryJobEvents.fifo” queue which is subscribed to the SNS FIFO “JobEvents.fifo” topic. It uses redrive policies for the SQS FIFO queue and the SNS FIFO subscription as well. This service is only interested in certain events, so uses an SNS filter policy to specify these events:
InventoryJobEventsQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: InventoryJobEvents.fifo
FifoQueue: true
RedrivePolicy:
deadLetterTargetArn: !GetAtt InventoryJobEventsQueueDLQ.Arn
maxReceiveCount: 3
InventoryJobEventsQueueToJobEventsTopicSubscription:
Type: AWS::SNS::Subscription
Properties:
Endpoint: !GetAtt InventoryJobEventsQueue.Arn
Protocol: sqs
RawMessageDelivery: true
TopicArn: !Ref JobEventsTopic
FilterPolicy: '{"eventType":["JobCreated", "JobDeleted"]}'
RedrivePolicy: !Sub '{"deadLetterTargetArn": "${InventoryJobEventsQueueSubscriptionDLQ.Arn}"}'
The inventory function also uses SQS FIFO as event source for Lambda. The DynamoDB table name is set as an environment variable, so the function can look up the name during initialization. The IAM policy grants read/write permissions for only this table:
InventoryFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: inventory-service/
Handler: app.lambda_handler
Runtime: python3.7
MemorySize: 256
Environment:
Variables:
TABLE_NAME: !Ref InventoryTable
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref InventoryTable
Events:
Trigger:
Type: SQS
Properties:
Queue: !GetAtt InventoryJobEventsQueue.Arn
BatchSize: 10
View the function implementation at the GitHub repo.
Conclusion
Amazon SNS FIFO topics can simplify the design of event-driven architectures and reduce custom code in building such applications.
By using the native integration with Amazon SQS FIFO queues, you can also build architectures that fan out to thousands of subscribers. This pattern helps achieve data consistency, deduplication, filtering, and encryption in near real time, using managed services.
For information on regional availability and service quotas, see SNS endpoints and quotas and SQS endpoints and quotas. For more information on the FIFO functionality, see SNS FIFO and SQS FIFO in their Developer Guides.