AWS Compute Blog

Introducing message archiving and analytics for Amazon SNS

February 12, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.


September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.

This blog post is courtesy of Sebastian Caceres (AWS Consultant, DevOps), Otavio Ferreira (Sr. Manager, Amazon SNS), Prachi Sharma and Mary Gao (Software Engineers, Amazon SNS).

Today, we are announcing the release of a message delivery protocol for Amazon SNS based on Amazon Kinesis Data Firehose. This is a new way to integrate SNS with storage and analytics services, without writing custom code.

SNS provides topics for push-based, many-to-many pub/sub messaging to help you decouple distributed systems, microservices, and event-driven serverless applications. As applications grow, so does the need to archive messages to meet compliance goals. These archives can also provide important operational and business insights.

Previously, custom code was required to create data pipelines, using general-purpose SNS subscription endpoints, such as Amazon SQS queues or AWS Lambda functions. You had to manage data transformation, data buffering, data compression, and the upload to data stores.

Overview

With the new native integration between SNS and Kinesis Data Firehose, you can send messages to storage and analytics services, using a purpose-built SNS subscription type.

Once you configure a subscription, messages published to the SNS topic are sent to the subscribed Kinesis Data Firehose delivery stream. The messages are then delivered to the destination endpoint configured in the delivery stream, which can be an Amazon S3 bucket, an Amazon Redshift table, or an Amazon Elasticsearch Service index.

You can also use a third-party service provider as the destination of a delivery stream, including Datadog, New Relic, MongoDB, and Splunk. No custom code is required to bridge the services. For more information, see Fanout to Kinesis Data Firehose streams, in the SNS Developer Guide.

Amazon SNS subscriber types with Amazon Kinesis Data Firehose.

The new Kinesis Data Firehose subscription type and its destinations are part of the application-to-application (A2A) messaging offering of SNS. The addition of this subscription type expands the SNS A2A offering to include the following use cases:

  • Run analytics on SNS messages, using Amazon Kinesis Data Analytics, Amazon Elasticsearch Service, or Amazon Redshift as a delivery stream destination. You can use this option to gain insights and detect anomalies in workloads.
  • Index and search SNS messages, using Amazon Elasticsearch Service as a delivery stream destination. From there, you can create dashboards using Kibana, a data visualization and exploration tool.
  • Store SNS messages for backup and auditing purposes, using S3 as a destination of choice. You can then use Amazon Athena to query the S3 bucket for analytics purposes.
  • Apply transformation to SNS messages. For example, you may obfuscate personally identifiable information (PII) or protected health information (PHI) using a Lambda function invoked by the delivery stream.
  • Feed SNS messages into cloud-based application monitoring and observability tools, using Datadog, New Relic, or Splunk as a destination. You can choose this option to enrich DevOps or marketing workflows.

As with all supported message delivery protocols, you can filter, monitor, and encrypt messages.

To simplify architecture and further avoid custom code, you can use an SNS subscription filter policy. This enables you to route only the relevant subset of SNS messages to the Kinesis Data Firehose delivery stream. For more information, see SNS message filtering.

To monitor the throughput, you can check the NumberOfMessagesPublished and the NumberOfNotificationsDelivered metrics for SNS, and the IncomingBytes, IncomingRecords, DeliveryToS3.Records and DeliveryToS3.Success metrics for Kinesis Data Firehose. For additional information, see Monitoring SNS topics using CloudWatch and Monitoring Kinesis Data Firehose using CloudWatch.

For security purposes, you can choose to have data encrypted at rest, using server-side encryption (SSE), in addition to encrypted in transit using HTTPS. For more information, see SNS SSE, Kinesis Data Firehose SSE, and S3 SSE.

Applying SNS message archiving and analytics in a use case

For example, consider an airline ticketing platform that operates in a regulated environment. The compliance framework requires that the company archives all ticket sales for at least 5 years.

Example architecture of a flight ticket selling platform.

The platform is based on an event-driven serverless architecture. It has a ticket seller Lambda function that publishes an event to an SNS topic for every ticket sold. The SNS topic fans out the event to subscribed systems that are interested in processing this type of event. In the preceding diagram, two systems are interested: one focused on payment processing, and another on fraud control. Each subscribed system is invoked by an SQS queue and an event processing Lambda function.

To meet the compliance goal on data retention, the airline company subscribes a Kinesis Data Firehose delivery stream to their existing SNS topic. They use an S3 bucket as the stream destination. After this, all events published to the SNS topic are archived in the S3 bucket.

The company can then use Athena to query the S3 bucket with standard SQL to run analytics and gain insights on ticket sales. For example, they can query for the most popular flight destinations or the most frequent flyers.

Subscribing a Kinesis Data Firehose stream to an SNS topic

You can set up a Kinesis Data Firehose subscription to an SNS topic using the AWS Management Console, the AWS CLI, or the AWS SDKs. You can also use AWS CloudFormation to automate the provisioning of these resources.

We use CloudFormation for this example. The provided CloudFormation template creates the following resources:

  • An SNS topic
  • An S3 bucket
  • A Kinesis Data Firehose delivery stream
  • A Kinesis Data Firehose subscription in SNS
  • Two SQS subscriptions in SNS
  • Two IAM roles with access to deliver messages:
    • From SNS to Kinesis Data Firehose
    • From Kinesis Data Firehose to S3

To provision the infrastructure, use the following template:

---
AWSTemplateFormatVersion: '2010-09-09'
Description: Template for creating an SNS archiving use case
Resources:
  ticketUploadStream:
    DependsOn:
    - ticketUploadStreamRolePolicy
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      S3DestinationConfiguration:
        BucketARN: !Sub 'arn:${AWS::Partition}:s3:::${ticketArchiveBucket}'
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 1
        CompressionFormat: UNCOMPRESSED
        RoleARN: !GetAtt ticketUploadStreamRole.Arn
  ticketArchiveBucket:
    Type: AWS::S3::Bucket
  ticketTopic:
    Type: AWS::SNS::Topic
  ticketPaymentQueue:
    Type: AWS::SQS::Queue
  ticketFraudQueue:
    Type: AWS::SQS::Queue
  ticketQueuePolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      PolicyDocument:
        Statement:
          Effect: Allow
          Principal:
            Service: sns.amazonaws.com
          Action:
            - sqs:SendMessage
          Resource: '*'
          Condition:
            ArnEquals:
              aws:SourceArn: !Ref ticketTopic
      Queues:
        - !Ref ticketPaymentQueue
        - !Ref ticketFraudQueue
  ticketUploadStreamSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref ticketTopic
      Endpoint: !GetAtt ticketUploadStream.Arn
      Protocol: firehose
      SubscriptionRoleArn: !GetAtt ticketUploadStreamSubscriptionRole.Arn
  ticketPaymentQueueSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref ticketTopic
      Endpoint: !GetAtt ticketPaymentQueue.Arn
      Protocol: sqs
  ticketFraudQueueSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref ticketTopic
      Endpoint: !GetAtt ticketFraudQueue.Arn
      Protocol: sqs
  ticketUploadStreamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Sid: ''
          Effect: Allow
          Principal:
            Service: firehose.amazonaws.com
          Action: sts:AssumeRole
  ticketUploadStreamRolePolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: FirehoseticketUploadStreamRolePolicy
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Action:
          - s3:AbortMultipartUpload
          - s3:GetBucketLocation
          - s3:GetObject
          - s3:ListBucket
          - s3:ListBucketMultipartUploads
          - s3:PutObject
          Resource:
          - !Sub 'arn:aws:s3:::${ticketArchiveBucket}'
          - !Sub 'arn:aws:s3:::${ticketArchiveBucket}/*'
      Roles:
      - !Ref ticketUploadStreamRole
  ticketUploadStreamSubscriptionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - sns.amazonaws.com
          Action:
          - sts:AssumeRole
      Policies:
      - PolicyName: SNSKinesisFirehoseAccessPolicy
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Action:
            - firehose:DescribeDeliveryStream
            - firehose:ListDeliveryStreams
            - firehose:ListTagsForDeliveryStream
            - firehose:PutRecord
            - firehose:PutRecordBatch
            Effect: Allow
            Resource:
            - !GetAtt ticketUploadStream.Arn

To test, publish a message to the SNS topic. After the delivery stream buffer interval of 60 seconds, the message appears in the destination S3 bucket. For information on message formats, see Amazon SNS message formats in Amazon Kinesis Data Firehose destinations.

Cleaning up

After testing, avoid incurring usage charges by deleting the resources you created during the walkthrough. If you used the CloudFormation template, delete all the objects from the S3 bucket before deleting the stack.

Conclusion

In this post, we show how SNS delivery to Kinesis Data Firehose enables you to integrate SNS with storage and analytics services. The example shows how to create an SNS subscription to use a Kinesis Data Firehose delivery stream to store SNS messages in an S3 bucket.

You can adapt this configuration for your needs for storage, encryption, data transformation, and data pipeline architecture. For more information, see Fanout to Kinesis Data Firehose streams in the SNS Developer Guide.

For details on pricing, see SNS pricing and Kinesis Data Firehose pricing. For more serverless learning resources, visit Serverless Land.