AWS Big Data Blog

Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS

Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit within the AWS cloud. This post shows you how to build a real-time streaming application using Kinesis in which your records are encrypted while at rest or in transit.

Amazon Kinesis overview

The Amazon Kinesis platform enables you to build custom applications that analyze or process streaming data for specialized needs. Amazon Kinesis can continuously capture and store terabytes of data per hour from hundreds of thousands of sources such as website clickstreams, financial transactions, social media feeds, IT logs, and transaction tracking events.

Through the use of HTTPS, Amazon Kinesis Streams encrypts data in-flight between clients which protects against someone eavesdropping on records being transferred. However, the records encrypted by HTTPS are decrypted once the data enters the service. This data is stored at rest for 24 hours (configurable up to 168 hours) to ensure that your applications have enough headroom to process, replay, or catch up if they fall behind.

Walkthrough

In this post you build encryption and decryption into sample Kinesis producer and consumer applications using the Amazon Kinesis Producer Library (KPL), the Amazon Kinesis Consumer Library (KCL), AWS KMS, and the aws-encryption-sdk. The methods and the techniques used in this post to encrypt and decrypt Kinesis records can be easily replicated into your architecture. Some constraints:

  • AWS charges for the use of KMS API requests for encryption and decryption, for more information see AWS KMS Pricing.
  • You cannot use Amazon Kinesis Analytics to query Amazon Kinesis Streams with records encrypted by clients in this sample application.
  • If your application requires low latency processing, note that there will be a slight hit in latency.

The following diagram shows the architecture of the solution.

Encrypting the records at the producer

Before you call the PutRecord or PutRecords API, you will encrypt the string record by calling KinesisEncryptionUtils.toEncryptedString.

In this example, we used a sample stock sales ticker object:

example {"tickerSymbol": "AMZN", "salesPrice": "900", "orderId": "300", "timestamp": "2017-01-30 02:41:38"}. 

The method (KinesisEncryptionUtils.toEncryptedString) call takes four parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • stock sales ticker object
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • util.Map of an encryption context

A ciphertext is returned back to the main caller which is then also checked for size by calling KinesisEncryptionUtils.calculateSizeOfObject. Encryption increases the size of an object. To prevent the object from being throttled, the size of the payload (one or more records) is validated to ensure it is not greater than 1MB. In this example encrypted records sizes with payload exceeding 1MB are logged as warning. If the size is less than the limit, then either addUserRecord or PutRecord and PutRecords are called if you are using the KPL or the Kinesis Streams API respectively 

Example: Encrypting records with KPL

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);
log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
   log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encrypted record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
//Adding the encrypted record to stream
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);
Futures.addCallback(f, callback);

In the above code, the example sales ticker record is passed to the KinesisEncryptionUtils.toEncryptedString and an encrypted record is returned. The encryptedRecord value is also passed to KinesisEncryptionUtils.calculateSizeOfObject and the size of the encrypted payload is returned and checked to see if it is less than 1MB. If it is, the payload is then UTF-8 encoded (KinesisEncryptionUtils.toEncryptedByteStream), then sent to the stream for processing.

Example: Encrypting the records with Streams PutRecord

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
    log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encryptyed record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
//putting the record into the stream
kinesis.putRecord(putRecordRequest);

Verifying that records are encrypted

After the call to KinesisEncryptionUtils.toEncryptedString, you can print out the encrypted string record just before UTF-8 encoding. An example of what is printed to standard output when running this sample application is shown below.

[main] INFO kinesisencryption.streams.EncryptedProducerWithStreams - String Record is TickerSalesObject{tickerSymbol='FB', salesPrice='184.285409142', orderId='2a0358f1-9f8a-4bbe-86b3-c2929047e15d', timeStamp='2017-01-30 02:41:38'} and Encrypted Record String is AYADeMf6zmVg9JvIkGNv5M39rhUAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFpUkpCaG1UOFQ3UTZQZ253dm9FSU9iUDZPdE1xTHdBZ1JjNlZxN2doMDZ3QlBEWUZndWdJSEFKaXNvT0ZPUGsrdz09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDCCYBk+hfB3tOGVx7QIBEIA7FqaEcOWpic+gKNeT+dUe4yttB9dsZSFPAUTlz2L2zlyLXSLMh1otRH24SO485ov+TCTtRCgiA8a9rYQCAAAAAAwAABAArlGWPO8BavNSJIpJOtJekRUhOwbM+WM1NBVXB/////8AAAABXNZnRND3J7u8EZx3AAAAkfSxVPMYUv0Ovrd4AIUTmMcaiR0Z+IcJNAXqAhvMmDKpsJaQG76Q6pYExarolwT+6i87UOi6TGvAiPnH74GbkEniWe66rAF6mOra2JkffK6pBdhh95mEOGLaVPBqs2jswUTfdcBJQl9NEb7wx9XpFX8fNDF56Vly7u6f8OQ7lY6fNrOupe5QBFnLvwehhtogd72NTQ/yEbDDoPKUZN3IlWIEAGYwZAIwISFw+zdghALtarsHSIgPMs7By7/Yuda2r3hqSmqlCyCXy7HMFIQxHcEILjiLp76NAjB1D8r8TC1Zdzsfiypi5X8FvnK/6EpUyFoOOp3y4nEuLo8M2V/dsW5nh4u2/m1oMbw=

You can also verify that the record stayed encrypted in Streams by printing out the UTF-8 decoded received record immediately after the getRecords API call. An example of the print output when running the sample application is shown below.

[Thread-2] INFO kinesisencryption.utils.KinesisEncryptionUtils - Verifying object received from stream is encrypted. -Encrypted UTF-8 decoded : AYADeBJz/kt7Fm3L1lvS8Wy8jhAAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFrM2N4K2s1ODJuOGVlNWF3TVJ1dk1UUHZQc2FHeGoxQisxb09kNWtDUExHYjJDS0lMZW5LSnlYakRmdFR4dzQyUT09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDAGI3oWLlIJ2p6kffQIBEIA7JVUOTsLtEyNK8vS4GIS9iyTejuB2xhIpRXfG8o0lUfHawcrCbNbNH8XLm/8RW5JbgXo10EpOs8dSjkICAAAAAAwAABAAy64r24sGVKWN4C1gXCwJYHvZkLpJJj16SZlhpv////8AAAABg2pPFchIiaM7D9VuAAAAkwh10ul5sZQ08KsgkFszOOvFoQu95CiY7cK8H+tBloVOZglMqhhhvoIIZLr9hmI8/lQvRXzGDdo7Xkp0FAT5Jpztt8Hq/ZuLfZtNYIWOw594jShqpZt6uXMdMnpb/38R3e5zLK5vrYkM6NS4WPMFrHsOKN5tn0CDForgojRcdpmCJ8+cWLNltb2S+EJiWiyWS+ibw2vJ/RFm6WZO6nD+MXn3vyMAZzBlAjAuIUTYL1cbQ3ENxDIeXHJAWQguNPqxq4HgaCmCEI9/rn/GAKSc2nT9ln3UsVq/2dgCMQC7yNJ3DCTnppavfxTbcVS+rXaDDpZZx/ZsluMqXAFM5/FFvKRqr0dVML28tGunxmU=

Decrypting the records at the consumer

After you receive the records into your consumer as a list, you can get the data as a ByteBuffer by calling record.getData. You then decode and decrypt the byteBuffer by calling the KinesisEncryptionUtils.decryptByteStream. This method takes five parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • record ByteBuffer
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • key arn string
  • java.util.Map of your encryption context

A string representation of the ticker sales object is returned back to the caller for further processing. In this example, this representation is just printed to standard output.

[Thread-2] INFO kinesisencryption.streams.DecryptShardConsumerThread - Decrypted Text Result is TickerSalesObject{tickerSymbol='AMZN', salesPrice='304.958313333', orderId='50defaf0-1c37-4e84-85d7-bc15597355eb', timeStamp='2017-01-30 02:41:38'}

Example: Decrypting records with the KCL and Streams API

ByteBuffer buffer = record.getData();
//Decrypting the encrypted record data
String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext());
log.info("Decrypted Text Result is " + decryptedResult);

With the above code, records in the Kinesis Streams are decrypted using the same key ARN and encryption context that was previously used to encrypt it at the producer side.

Maven dependencies

To use the implementation I’ve outlined in this post, you need to use a few maven dependencies outlined below in the pom.xml together with the Bouncy Castle libraries. Bouncy Castle provides a cryptography API for Java.

 <dependency>
        <groupId>org.bouncycastle</groupId>
        <artifactId>bcprov-ext-jdk15on</artifactId>
        <version>1.54</version>
    </dependency>
<dependency>
   <groupId>com.amazonaws</groupId>
   <artifactId>aws-encryption-sdk-java</artifactId>
   <version>0.0.1</version>
</dependency>

Summary

You may incorporate above sample code snippets or use it as a guide in your application code to just start encrypting and decrypting your records to and from an Amazon Kinesis Stream.

A complete producer and consumer example application and a more detailed step-by-step example of developing an Amazon Kinesis producer and consumer application on AWS with encrypted records is available at the kinesisencryption github repository.

If you have questions or suggestions, please comment below.


About the Author

Temitayo Olajide is a Cloud Support Engineer with Amazon Web Services. He works with customers to provide architectural solutions, support and guidance to implementing high velocity streaming data applications in the cloud. In his spare time, he plays ping-pong and hangs out with family and friends

 

 


Related

Secure Amazon EMR with Encryption