AWS Big Data Blog

Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount

by Allan MacInnis | on | | Comments

Allan MacInnis is a Kinesis Solution Architect for Amazon Web Services

Starting today, you can easily scale your Amazon Kinesis streams to respond in real time to changes in your streaming data needs. Customers use Amazon Kinesis to capture, store, and analyze terabytes of data per hour from clickstreams, financial transactions, social media feeds, and more.

With the new Amazon Kinesis Streams UpdateShardCount API operation, you can automatically scale your stream shard capacity by using Amazon CloudWatch alarms, Amazon SNS, and AWS Lambda. In this post, I walk you through an example of how you can automatically scale your shards using a few lines of code.

Getting started

In this example, I only demonstrate how to scale out capacity within your Amazon Kinesis stream. However, you can implement a similar strategy for when you need to scale in the number of shards in your stream, as the approach is nearly identical.

Consider a stream containing two shards, with a producer that is using a random partition key. As a reminder, each shard in a stream can ingest up to 1000 records per second, or up to 1 MB per second of data. If your data producer exceeds either of those values, Amazon Kinesis Streams raises an exception, and your producer needs to retry records that did not get written successfully. Retrying failed records is a valid approach if the spike is very short-lived. However, to ingest more than 1000 records per second for a longer duration, you need to scale the number of shards in your stream. Instead of scaling manually, how about building a system that can automatically scale the shards for you?

Pick your CloudWatch scaling thresholds

You need to consider the threshold at which the system should automatically add shards. For this post, double the total shard count of your stream when the number of records per second being written to the stream reaches 80% of the total records per second capacity. So, for a two-shard stream, the limit is 2000 records per second. You want the threshold to be set to 80% of that, or 1600 records per second. When this threshold is breached, you want the system to double the shard count in the stream to four, scaling up to a total ingestion capacity of 4000 records per second.

Create an alarm on the IncomingRecords metric of your stream, with a threshold value set to greater than or equal to 240000 for the Sum value of this metric.  Set the alarm to monitor over a single period of five minutes. In other words, if the total number of records written to your stream over a five-minute period exceeds 240000, double the number of shards in the stream. The 240000 value is calculated using the 80% critera (800 records per second), multiplied by the number of shards in the stream (2), multiplied by the number of seconds in the period (300).

Automatically adjust your CloudWatch scaling thresholds

You also need to adjust the alarm threshold to accommodate for the new shard capacity automatically. For this example, update the alarm threshold to 80% of your new capacity (or 3200 records per second) by setting a CloudWatch alarm with an action to publish to a SNS topic when the alarm is triggered.

You can then create a Lambda function that subscribes to this SNS topic and executes a call to the new UpdateShardCount API operation while adjusting the CloudWatch alarm threshold. To learn how to configure a Cloudwatch alarm, see Creating Amazon Cloudwatch Alarms. For information about how to invoke a Lambda function from SNS, see Invoking Lambda Functions Using Amazon SNS Notifications.

To scale in your shards, a similar strategy is used: create a second CloudWatch alarm that would be triggered when the input dropped below a threshold; the Lambda function would halve the number of shards and halve the CloudWatch alarm thresholds.

Example Lambda function

The following Python code doubles the shard count and adjusts the threshold for the alarm that triggered the scaling action. You can create a second, similar function to handle scale-in actions, or just adjust this one with some conditional logic to handle both scenarios.

import json
import boto3

kinesis = boto3.client('kinesis')
cw = boto3.client('cloudwatch')

def lambda_handler(event, context):
    message = json.loads(event['Records'][0]['Sns']['Message'])

    streamName = message['Trigger']['Dimensions'][0]['value']  
    stream = kinesis.describe_stream(
        StreamName=streamName
    )

    #determine total number of shards in the stream
    totalShardCount = len(stream['StreamDescription']['Shards'])
    lastShardId = stream['StreamDescription']['Shards'][totalShardCount - 1]['ShardId']
    while(stream['StreamDescription']['HasMoreShards']):
        stream = kinesis.describe_stream(
            StreamName=streamName,
            ExclusiveStartShardId=lastShardId
        )
        currentShardCount = len(stream['StreamDescription']['Shards'])
        totalShardCount += currentShardCount
        lastShardId = stream['StreamDescription']['Shards'][currentShardCount - 1]['ShardId']
    
    #double the shard count in the stream
    kinesis.update_shard_count(
        StreamName=streamName,
        TargetShardCount=totalShardCount * 2,
        ScalingType='UNIFORM_SCALING'
    )

    #double the threshold for the CloudWatch alarm that triggered this function
    cw.put_metric_alarm(
        AlarmName=message['AlarmName'],
        AlarmActions=[event['Records'][0]['Sns']['TopicArn']],
        MetricName=message['Trigger']['MetricName'],
        Namespace=message['Trigger']['Namespace'],
        Statistic=message['Trigger']['Statistic'].title(),
        Dimensions=[
            {
                'Name': message['Trigger']['Dimensions'][0]['name'],
                'Value': message['Trigger']['Dimensions'][0]['value']
            }
        ],
        Period=message['Trigger']['Period'],
        EvaluationPeriods=message['Trigger']['EvaluationPeriods'],
        Threshold=message['Trigger']['Threshold'] * 2,
        ComparisonOperator=message['Trigger']['ComparisonOperator']
    )

Summary

To learn more about UpdateShardCount, see AWS documentation. If you have any questions or feedback related to the content in this post or UpdateShardCount, please comment below or in the AWS Forums.