AWS Compute Blog

Serverless Cross Account Stream Replication Using AWS Lambda, Amazon DynamoDB, and Amazon Kinesis Firehose

February 12, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.


September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.


This is a guest post by Richard Freeman, Ph.D., a solutions architect and data scientist at JustGiving. JustGiving in their own words: We are one of the world’s largest social platforms for giving that’s helped 27.7 million users in 196 countries raise $4.1 billion for over 27,000 good causes.”

At JustGiving, we want our analysts and data scientist to have access to production Amazon Kinesis data in near real-time and have the flexibility to make transformations, but without compromising on security. Rather than build a custom service using the Kinesis Client Library (KCL) or maintain an Amazon EC2 instance running with a Java Kinesis Agent, we used an AWS Lambda function that does not require us to maintain a running server. The Lambda function is used to process Amazon Kinesis events, enrich them, and write them to Amazon DynamoDB in another AWS account.

After the data is stored in DynamoDB, further systems can process the data as a stream; we persist the data to S3 via Amazon Kinesis Firehose using another Lambda function. This gives us a truly serverless environment, where all the infrastructure including the integration, connectors, security, and scalability is managed by AWS, and allows us to focus only on the stream transformation logic rather than on code deployment, systems integration, or platform.

This post shows you how to process a stream of Amazon Kinesis events in one AWS account, and persist it into a DynamoDB table and Amazon S3 bucket in another account, using only fully managed AWS services and without running any servers or clusters. For this walkthrough, I assume that you are familiar with DynamoDB, Amazon Kinesis, Lambda, IAM, and Python.

Overview

Amazon Kinesis Streams is a stream processing engine that can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, web logs, sensors, and location-tracking events.

Working with a production environment and a proof of concept (POC) environment that are in different AWS accounts is useful if you have web analytics traffic written to Amazon Kinesis in production and data scientists need near real-time and historical access to the data in another non-operational, non-critical, and more open AWS environment for experiments. In addition, you may want to persist the Amazon Kinesis records in DynamoDB without duplicates.

The following diagram shows how the two environments are structured.

The production environment contains multiple web servers running data producers that write web events to Amazon Kinesis, which causes a Lambda function to be invoked with a batch of records. The Lambda function (1) assumes a role in the POC account, and continuously writes those events to a DynamoDB table without duplicates.

After the data is persisted in DynamoDB, further processing can be triggered via DynamoDB Streams and invoke a Lambda function (2) that could be used to perform streaming analytics on the events or to persist the data to S3 (as described in this post). Note that the Lambda function (1) can also be combined with what the function (2) does, but we wanted to show how to process both Amazon Kinesis stream and DynamoDB Streams data sources. Also, having the second function (2) allows data scientists to project, transform, and enrich the stream for their requirements while preserving the original raw data stream.

Setting up the POC environment

The POC environment is where we run data science experiments, and is the target environment for the production Amazon Kinesis data. For this example, give the POC the account number: 999999999999.

Create a target table in the POC account

This DynamoDB table will be where the Kinesis events will be written to. To create our table we will show an example using the AWS console:

  1. Open the DynamoDB console.
  2. On the navigation pane, choose Create Table and configure the following:
    • For Table name, enter prod-raven-lambda-webanalytics-crossaccount.
    • For Primary key (hash/partition key), enter seqNumbershardIdHash and choose String.
    • Choose Add sort key.
    • For Sort key Name (range), enter sequenceNumber and choose String.
  3. Under Table settings , clear Use default settings.
  4. Choose Create.
  5. Under Tables , select the table prod-raven-lambda-webanalytics-crossaccount and configure the following:
    • Choose Capacity.
    • Under Provisioned Capacity :
    • For Read capacity units , enter 20.
    • For Write capacity units , enter 20.
    • Choose Save.

Notes:

  • Optionally, and depending on your records, you can also create a global secondary index if you want to search a subset of the records by user ID or email address – this is very useful for QA in development and staging environments.
  • Provisioned throughput capacity should be set based n the amount of expected events that will be read and written per second. If the reads or writes are above this capacity for a period of time, then throttling occurs, and writes must be retried. There are many strategies to deal with this, including having the capacity set dynamically depending on the current consumed read or write of events, or having exponential backoff retry logic.
  • DynamoDB is a schemaless database, so no other fields or columns need to be specified up front. Each item in the table can have a different number of elements.

Create an IAM policy in the POC account

First, create an IAM role that can be used by the Lambda functions to write to the DynamoDB table.

  1. In the POC account, pen the IAM console.
  2. On the navigation pane, choose Policies , Create Policy.
  3. Choose Create Your Own Policy and configure the following:
    • For Policy Name , enter prod-raven-lambda-webanalytics-crossaccount.
    • For Description , enter “Read/write access to the POC prod-raven-lambda-webanalytics-crossaccount table” or similar text.
    • For Policy Document , insert the following JSON:
{    
    "Version": "2012-10-17",    
    "Statement": [        
        {            
            "Sid": "Stmt345926348000",
            "Effect": "Allow",
            "Action": [
                "dynamodb:BatchGetItem",
                "dynamodb:BatchWriteItem",
                "dynamodb:DescribeStream",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:GetRecords",
                "dynamodb:ListTables",
                "dynamodb:PutItem",
                "dynamodb:Query",
                "dynamodb:Scan",
                "dynamodb:UpdateItem",
                "dynamodb:UpdateTable"],
            "Resource": [ "arn:aws:dynamodb:<region>:<999999999999>:table/prod-raven-lambda-webanalytics-crossaccount" ]
        }    
    ]
}

The action array list specifies the allowable actions on the specified resource. Here, it is the DynamoDB table created earlier. Replace with your region, e.g., eu-west-1 and with your AWS account ID.

Create an IAM role

Next, create an IAM role which uses this policy, so that Lambda can assume an identity with the indicated privileges.

  1. Open the IAM console, choose Roles , Create New Role.
    • For Role Name , enter DynamoDB-ProdAnalyticsWrite-role.
    • Choose Role for Cross Account Access to provide access between AWS accounts you own.
    • For Establish Trust , enter the production AWS account ID, e.g., 111111111111.
    • Select the policy created earlier, prod-raven-lambda-webanalytics-crossaccount.
  2. Choose Review , Create Role.

This role will be created with an Amazon Resource Notation ID (ARN), e.g. arn:aws:iam::999999999999:role/DynamoDB-ProdAnalyticsWrite-role. You will now see the policy is attached and the trusted account ID is 111111111111.

Setting up the production environment

In the production environment, you use a Lambda function to read, parse, and transform the Amazon Kinesis records and write them to DynamoDB. Access to Amazon Kinesis and DynamoDB is fully managed through IAM policies and roles. At this point, you need to sign out of the POC account and sign in as a user with IAM administration rights in the production account 111111111111.

Create a Lambda policy

Create a policy that allows a production user to assume the role of a user in the POC account .

  1. In the production environment, open the IAM console and choose Policies.
  2. Choose Create Policy , Create Your Own Policy.
    1. For Policy Name , enter prod-raven-lambda-assumerole.
    2. For Description , enter “This policy allows the Lambda function to execute, write to CloudWatch and assume a role in POC” or similar text.
    3. For Policy Document , insert the following JSON:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:DescribeStream",
                "kinesis:ListStreams",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Resource": "*",
            "Action": [
                "logs:*"
            ],
            "Effect": "Allow"
        },
        {
            "Sid": "Stmt1435680952001",
            "Effect": "Allow",
            "Action": [
                "iam:PassRole",
                "iam:GenerateCredentialReport",
                "iam:Get*",
                "iam:List*"
            ],
            "Resource": [
            "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "sts:AssumeRole",
            "Resource": "arn:aws:iam::<999999999999>:role/DynamoDB-ProdAnalyticsWrite-role"
        }
    ]
}

Create a Lambda execution IAM role

  1. In the IAM console, choose Roles , Create New Role.
  2. For Set Role Name , enter LambdaKinesisDynamo and choose Next step.
  3. For Role Type , choose AWS Service Roles , AWS Lambda.
  4. Select the policy created earlier, prod-raven-lambda-assumerole** , and choose Next step **.
  5. Choose Review , Create Role.

Create the processor Lambda function

  1. 1. Open the Lambda console and choose Create a Lambda Function.
  2. 2. Select the blueprint kinesis-process-record-python.
  3. 3. Configure the event source:
    • For Event Source type , choose Kinesis.
    • For Kinesis Stream , select web-analytics (or your stream) .
    • For Batch Size , enter 300 (this depends on the frequency that events are added to Amazon Kinesis).
    • For Starting position , choose Trim horizon.
  4. 4. Configure the function:
    • For Name , enter ProdKinesisToPocDynamo.
    • For Runtime , choose Python 2.7.
    • For Edit code inline , add the Lambda function source code (supplied in the next section).
    • For Handler , choose lambdafunction.lambdahandler.
    • For Role , choose the role that you just created, LambdaKinesisDynamo.
    • For Memory ,choose 128 (this depends on the frequency that events are added to Amazon Kinesis)
    • For Timeout , choose 2min 0 sec (this depends on the frequency that events are added to Amazon Kinesis).
  5. 5. Review the configuration:
    • For Enable event source , choose Enable later (this can be enabled later via the Event sources tab).
  6. 6. Choose Create function.

Create the Lambda function code

At the time of this post, Lambda functions support the Python, Node.js, and Java languages. For this task, you implement the function in Python with the following basic logic:

  • Assume a role that allows the Lambda function to write to the DynamoDB table. Manually configure the DynamoDB client to use the newly-assumed cross account role.
  • Convert Amazon Kinesis events to the DynamoDB format.
  • Write the record to the DynamoDB table.

Assume a cross account role for access to the POC account DynamoDB table

This code snippet uses the AWS Security Token Service (AWS STS) which enables the creation of temporary IAM credentials for an IAM role. The assume_role() function returns a set of temporary credentials (consisting of an access key ID, a secret access key, and a security token) that can be used to access the DynamoDB table via the IAM role arn:aws:iam::999999999999:role/DynamoDB-ProdAnalyticsWrite-role.

Assume the role

import base64, json, boto3

def lambda_handler(event, context):
    client = boto3.client('sts')
    sts_response = client.assume_role(RoleArn='arn:aws:iam::<999999999999>:role/DynamoDB-PrdAnalyticsWrite-role',                              
                                      RoleSessionName='AssumePocRole', DurationSeconds=900)

Configure the DynamoDB client using a cross account role

    dynamodb = boto3.resource(service_name='dynamodb', region_name=<region>,
                              aws_access_key_id = sts_response['Credentials']['AccessKeyId'],
                              aws_secret_access_key = sts_response['Credentials']['SecretAccessKey',
                              aws_session_token = sts_response['Credentials']['SessionToken'])

You can now use any DynamoDB methods in the AWS SDK for Python (Boto 3) to write to DynamoDB tables in the POC environment, from the production environment.

In order to test the code, I recommend that you create a test event. The simplest way to do this is to print the event from Amazon Kinesis, copy it from the Lambda function to CloudWatch Logs, and convert it into valid JSON. You can now use it when you select the Lambda function and choose Actions , Configure Test Event. After you are happy with the records in DynamoDB, you can choose Event Source,** Disable** to disable the mapping between the Lambda function and Amazon Kinesis.

Here’s the Python code to write an Amazon Kinesis record to the DynamoDB table:

    tableName = 'prod-raven-lambda-webanalytics'
    for record in event['Records']:
        try:            
            payload_json = json.loads(base64.b64decode(record['kinesis']['data']))
            payload_json['sequenceNumber'] = record['kinesis']['sequenceNumber']
            payload_json['shardId'] = record['eventID'].split(':')[0]
            payload_json['seqNumbershardIdHash'] = payload_json['sequenceNumber'][-3:-1]+'_'+ payload_json['shardId']
            items={}
            for k, v in payload_json.items():
                if v is not None and v != '':
                    items[k] = v
            table = dynamodb.Table(tableName)
            response  = table.put_item(Item=items)
        except Exception as e:
            print(e.__doc__)
            print(e.message)
    return 'Successfully processed {} records.'.format(len(event['Records']))

The first part of the source snippet iterates over the batch of events that were sent to the Lambda function as part of the Amazon Kinesis event source, and decodes from base64 encoding. Then the fields used for the DynamoDB hash and range are extracted directly from the Amazon Kinesis record. This helps avoid storing duplicate records or idempotent processing, and allows for rapid retrieval without the need to do a full table scan (more on this later). Each record is then converted into the DynamoDB format, and finally written to the target DynamoDB table with the put_item()function.

As the data is now persisted in the POC environment, data scientists and analysts are free to experiment on the latest production data without having access to production or worrying about impacting the live website.

Records in the POC DynamoDB table

Now I’ll give technical details on how you can ensure that the DynamoDB records are idempotent, and different options for querying the data. The pattern I describe can be more widely used as it will work with any Amazon Kinesis records. The idea is to use the Amazon Kinesis sequence number, which is unique per stream, as the range, and a subset of it as a composite hash. In our experiments, we found that the taking the last two minus one digits (here, the 80 in red) from the sequence number was better distributed than the last two (02).

This schema ensures that you do not have duplicate Amazon Kinesis records without the need to maintain any state or position in the client, or change the records in any way, which is well suited for a Lambda function. For example if you deleted the Amazon Kinesis event source of the Lambda function and added a new one an hour later with a trim horizon, then this would mean that the existing DynamoDB rows would be overwritten, as the records would have identical hash and range keys. Not having duplicate records is very useful for any incremental loads into other systems and reduces the deduplication work needed downstream, and also allows us to readily replay Amazon Kinesis records.

An example of a populated DynamoDB table could look like the following graphic.

Notice that the composite key of hash and range is unique and that other fields captured can be useful in understanding the user click stream. For example you can see that if you run a query on attribute useremail_ and order by nanotimestamp_, you obtain the clickstream of an individual user. However, this means that DynamoDB has to analyze the whole table in what is known as a table scan, as you can only query on hash and range keys.

One solution is to use a global secondary index, as discussed earlier; however, you might suffer from the data writes being throttled in the main table should the index writes be throttled (AWS is working on an asynchronous update process), This can happen if the hash/range are not well distributed. If you need to use the index, one solution is to use a scalable pattern with global secondary indexes.

Another way is to use DynamoDB Streams as an event source to trigger another Lambda function that writes to a table specifically used for querying by user email, e.g., the hash could be useremail_ and the range could be nanotimestamp_. This also means that you are no longer limited to five indexes per main table and the throttling issues mentioned earlier do not affect the main table.

Using DynamoDB Streams is also more analytics-centric as it gives you dynamic flexibility on the primary hash and range, allowing data scientists to experiment with different configurations. In addition, you might use a Lambda function to write the records to one table for the current week and another one for the following week; this allows you to keep the most recent hot data in a smaller table, and older or colder data in separate tables that could be removed or archived on a regular basis.

Persisting records to S3 via Amazon Kinesis Firehose

Now that you have the data in DynamoDB, you can enable DynamoDB Streams to obtain a stream of events which can be fully sourced and processed in the POC. For example, you could perform time series analysis on this data or persist the data to S3 or Amazon Redshift for batch processing.

To write to S3, use a Lambda function that reads the records from DynamoDB Streams and writes them to Amazon Kinesis Firehose. Other similar approaches exist but they rely on having a running server with custom deployed code. Firehose is a fully managed service for delivering real-time streaming data to destinations such as S3.

First, set up an IAM policy and associate it with an IAM role so that the Lambda function can write to Firehose.

Create an IAM policy

  1. Open the IAM console.
  2. On the navigation pane, choose Policies , Create Policy.
  3. Choose Create Your Own Policy and configure the following:
    • For Policy Name , enter poc-raven-lambda-firehose.
    • For Description , enter Read access to DynamoDB, Put records to Kinesis Firehose, and Lambda execution rights.
    • For Policy Document , insert the following JSON:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:ListStreams",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:DescribeDeliveryStream",
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination"
            ],
            "Resource": [
                "*"
            ]
        }

    ]
}

Next, create an IAM role which uses this policy, so that Lambda can assume an identity with the required privileges of reading from DynamoDB Streams, writing to Amazon Kinesis streams.

Create an IAM role

  1. Open the IAM console.
  2. On the navigation pane, choose Roles , Create New Role.
  3. For Set Role Name , enter lambdadynamostreams_firehose and choose Next step.
  4. For Role Type , choose AWS Service Roles , AWS Lambda.
  5. Select the policy created earlier, poc-raven-lambda-firehose** , and choose Next step **.
  6. Choose Review , Create Role.

Create the Lambda function

  1. Open the Lambda console.
  2. Choose Create a Lambda function and select the dynamo-process-stream-python blueprint.
  3. Configure the event sources:
    • For Event Source type , choose DynamoDB.
    • For DynamoDB Table , enter prod-raven-lambda-webanalytics.
    • For Batch Size , enter 300 (this depends on the frequency that events are added to DynamoDB).
    • For Starting position , choose Trim horizon.
  4. Configure the function:
    • For Name , enter PocLambdaFirehose.
      -For Runtime , choose Python 2.7.
      -For Edit code inline , add the Lambda function source code (see code below).
    • For Handler , choose lambdafunction.lambdahandler.
    • For Role , select the role you created earlier, e.g., lambdadynamostreams_firehose. This grants the Lambda function access to DynamoDB Streams, Firehose, and CloudWatch metrics and logs.
    • For Memory (MB), choose 128.
    • For Timeout , enter 5 min 0 sec.
  5. Review
    • For Enable event source , choose Enable later (this can be enabled later via the Event sources tab).
    • Choose Create function.
from __future__ import print_function
import json
import boto3

def lambda_handler(event, context):
    firehose_client = boto3.client(service_name='firehose', region_name='eu-west-1')
    stream_name='stg-kinesis'
    for record in event['Records']:
        output_records = {}
        for key, value in record['dynamodb']['NewImage'].iteritems():
            if value.keys()[0] in ['S', 'N', 'B']:
                output_records[key]=value.values()[0]
        kinesis_data="".join([json.dumps(output_records),"\n"])        
        put_record(stream_name, kinesis_data, firehose_client)
    return 'done'

def put_record(stream_name, data, client):    
    client.put_record(DeliveryStreamName=stream_name,
                      Record={'Data':data}) 

First, the Lambda function iterates over all records and values, and flattens the JSON data structure from the DynamoDB format to JSON records, e.g., ‘event’:{‘S’:’page view’}becomes{‘event’:’page view’}. Then, the record is then encoded as a JSON record and sent to Firehose.

For testing, you can log events from your Amazon Kinesis stream by printing them as JSON to standard out; Lambda automatically delivers this information into CloudWatch Logs. This information can then be used as the test event in the Lambda console.

Configuring the Amazon Kinesis Firehose delivery stream

  1. Open the Amazon Kinesis Firehose console.
  2. Choose Create Delivery Stream.
    • For Destination , choose Amazon S3.
    • For Delivery stream name , enter: prod-s3-firehose.
    • For S3 bucket , choose Create new S3 bucket or Use an existing S3 bucket.
    • For S3 prefix , enter prod-kinesis-data.
  3. Choose Next to change the following Configuration settings.
    • For Buffer size , enter 5.
    • For Buffer interval , enter 300.
    • For Data compression , choose UNCOMPRESSED.
    • For Data Encryption , choose No Encryption.
    • For Error Logging , choose Enable.
    • For IAM role , choose Firehose Delivery IAM Role. This opens a new window: Amazon Kinesis Firehose is requesting permission to use resources in your account.
    • For IAM Role , choose Create a new IAM role.
    • For Role Name, choose firehosedeliveryrole.
  4. After the policy document is automatically generated, choose Allow.
  5. Choose Next.
  6. Review your settings and choose Create Delivery Stream.

After the Firehose delivery stream is created, select the stream and choose Monitoring to see activity. The following graphic shows some example metrics.

After the DynamoDB Streams event source for the Lambda function is enabled, new objects with production Amazon Kinesis records passing through the POC DynamoDB table are automatically created in the POC S3 bucket using Firehose. Firehose can also be used to import data to Amazon Redshift or Amazon Elasticsearch Service.

Benefits of serverless processing

Using a Lambda function for stream processing enables the following benefits:

  • Flexibility – Analytics code can be changed on the fly.
  • High availability – Runs in multiple Availability Zones in a region
  • Zero-maintenance and upgrade of the running instances, all services. are supported by AWS
  • Security – No use of keys or passwords. IAM roles can be used to integrate with Amazon Kinesis Streams, Amazon Kinesis Firehose, and DynamoDB.
  • Serverless compute – There are no EC2 instances or clusters to set up and maintain.
  • Automatic scaling – The number of Lambda functions invoked changes depending on the number of writes to Amazon Kinesis. Note that the upper concurrent limit is the number of Amazon Kinesis streams and DynamoDB Streams shards.
  • Low cost – You pay only for the execution time of the Lambda function.
  • Ease of use – Easy selection of language for functions, and very simple functions. In these examples, the functions just iterate over and parse JSON records.

Summary

In this post, I described how to persist events from an Amazon Kinesis stream into DynamoDB in another AWS account, without needing to build and host your own server infrastructure or using any keys. I also showed how to get the most out of DynamoDB, and proposed a schema that eliminates duplicate Amazon Kinesis records and reduces the need for further de-duplication downstream.

In a few lines of Python code, this pattern has allowed the data engineers at JustGiving the flexibility to project, transform, and enrich the Amazon Kinesis stream events as they arrive. Our data scientists and analysts now have full access to the production data in near-real time but in a different AWS account, allowing them to quickly undertake streaming analysis, project a subset of the data, and run experiments.

I also showed how to persist that data stream to S3 using Amazon Kinesis Firehose. This allows us to quickly access the historical clickstream dataset, without needing a permanent server or cluster running.

If you have questions or suggestions, please comment below. You may also be interested in seeing other material that digs deeper into the capabilities of Lambda and DynamoDB.