AWS Compute Blog
Using AWS Lambda for streaming analytics
AWS Lambda now supports streaming analytics calculations for Amazon Kinesis and Amazon DynamoDB. This allows developers to calculate aggregates in near-real time and pass state across multiple Lambda invocations. This feature provides an alternative way to build analytics in addition to services like Amazon Kinesis Data Analytics.
In this blog post, I explain how this feature works with Kinesis Data Streams and DynamoDB Streams, together with example use-cases.
Overview
For workloads using streaming data, data arrives continuously, often from different sources, and is processed incrementally. Discrete data processing tasks, such as operating on files, have a known beginning and end boundary for the data. For applications with streaming data, the processing function does not know when the data stream starts or ends. Consequently, this type of data is commonly processed in batches or windows.
Before this feature, Lambda-based stream processing was limited to working on the incoming batch of data. For example, in Amazon Kinesis Data Firehose, a Lambda function transforms the current batch of records with no information or state from previous batches. This is also the same for processing DynamoDB streams using Lambda functions. This existing approach works well for MapReduce or tasks focused exclusively on the data in the current batch.
- DynamoDB streams invoke a processing Lambda function asynchronously. After processing, the function may then store the results in a downstream service, such as Amazon S3.
- Kinesis Data Firehose invokes a transformation Lambda function synchronously, which returns the transformed data back to the service.
This new feature introduces the concept of a tumbling window, which is a fixed-size, non-overlapping time interval of up to 15 minutes. To use this, you specify a tumbling window duration in the event-source mapping between the stream and the Lambda function. When you apply a tumbling window to a stream, items in the stream are grouped by window and sent to the processing Lambda function. The function returns a state value that is passed to the next invocation of the tumbling window.
This feature is useful in workloads where you need to calculate aggregates continuously. For example, for a retailer streaming order information from point-of-sale systems, it can generate near-live sales data for downstream reporting. Using Lambda to generate aggregates only requires minimal code, and the function can access other AWS services as needed.
Using tumbling windows with Lambda functions
When you configure an event source mapping between Kinesis or DynamoDB and a Lambda function, use the new setting, Tumbling window duration. This appears in the trigger configuration in the Lambda console:
You can also set this value in AWS CloudFormation and AWS SAM templates. After the event source mapping is created, events delivered to the Lambda function have several new attributes:
These include:
- Window start and end: the beginning and ending timestamps for the current tumbling window.
- State: an object containing the state returned from the previous window, which is initially empty. The state object can contain up to 1 MB of data.
- isFinalInvokeForWindow: indicates if this is the last invocation for the tumbling window. This only occurs once per window period.
- isWindowTerminatedEarly: a window ends early only if the state exceeds the maximum allowed size of 1 MB.
In any tumbling window, there is a series of Lambda invocations following this pattern:
- The first invocation contains an empty state object in the event. The function returns a state object containing custom attributes that are specific to the custom logic in the aggregation.
- The second invocation contains the state object provided by the first Lambda invocation. This function returns an updated state object with new aggregated values. Subsequent invocations follow this same sequence.
- The final invocation in the tumbling window has the isFinalInvokeForWindow flag set to the true. This contains the state returned by the most recent Lambda invocation. This invocation is responsible for storing the result in S3 or in another data store, such as a DynamoDB table. There is no state returned in this final invocation.
Using tumbling windows with DynamoDB
DynamoDB streams can invoke Lambda function using tumbling windows, enabling you to generate aggregates per shard. In this example, an ecommerce workload saves orders in a DynamoDB table and uses a tumbling window to calculate the near-real time sales total.
First, I create a DynamoDB table to capture the order data and a second DynamoDB table to store the aggregate calculation. I create a Lambda function with a trigger from the first orders table. The event source mapping is created with a Tumbling window duration of 30 seconds:
I use the following code in the Lambda function:
const AWS = require('aws-sdk')
AWS.config.update({ region: process.env.AWS_REGION })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindowsAggregation'
function isEmpty(obj) { return Object.keys(obj).length === 0 }
exports.handler = async (event) => {
// Save aggregation result in the final invocation
if (event.isFinalInvokeForWindow) {
console.log('Final: ', event)
const params = {
TableName,
Item: {
windowEnd: event.window.end,
windowStart: event.window.start,
sales: event.state.sales,
shardId: event.shardId
}
}
return await docClient.put(params).promise()
}
console.log(event)
// Create the state object on first invocation or use state passed in
let state = event.state
if (isEmpty (state)) {
state = {
sales: 0
}
}
console.log('Existing: ', state)
// Process records with custom aggregation logic
event.Records.map((item) => {
// Only processing INSERTs
if (item.eventName != "INSERT") return
// Add sales to total
let value = parseFloat(item.dynamodb.NewImage.sales.N)
console.log('Adding: ', value)
state.sales += value
})
// Return the state for the next invocation
console.log('Returning state: ', state)
return { state: state }
}
This function code processes the incoming event to aggregate a sales attribute, and return this aggregated result in a state object. In the final invocation, it stores the aggregated value in another DynamoDB table.
I then use this Node.js script to generate random sample order data:
const AWS = require('aws-sdk')
AWS.config.update({ region: 'us-east-1' })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindows'
const ITERATIONS = 100
const SLEEP_MS = 100
let totalSales = 0
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
const createSales = async () => {
for (let i = 0; i < ITERATIONS; i++) {
let sales = Math.round (parseFloat(100 * Math.random()))
totalSales += sales
console.log ({i, sales, totalSales})
await docClient.put ({
TableName,
Item: {
ID: Date.now().toString(),
sales,
ITERATIONStamp: new Date().toString()
}
}).promise()
await sleep(SLEEP_MS)
}
}
const main = async() => {
await createSales()
console.log('Total Sales: ', totalSales)
}
main()
Once the script is complete, the console shows the individual order transactions and the total sales:
After the tumbling window duration is finished, the second DynamoDB table shows the aggregate values calculated and stored by the Lambda function:
Since aggregation for each shard is independent, the totals are stored by shardId. If I continue to run the test data script, the aggregation function continues to calculate and store more totals per tumbling window period.
Using tumbling windows with Kinesis
Kinesis data streams can also invoke a Lambda function using a tumbling window in a similar way. The biggest difference is that you control how many shards are used in the data stream. Since aggregation occurs per shard, this controls the total number aggregate results per tumbling window.
Using the same sales example, first I create a Kinesis data stream with one shard. I use the same DynamoDB tables from the previous example, then create a Lambda function with a trigger from the first orders table. The event source mapping is created with a Tumbling window duration of 30 seconds:
I use the following code in the Lambda function, modified to process the incoming Kinesis data event:
const AWS = require('aws-sdk')
AWS.config.update({ region: process.env.AWS_REGION })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindowsAggregation'
function isEmpty(obj) {
return Object.keys(obj).length === 0
}
exports.handler = async (event) => {
// Save aggregation result in the final invocation
if (event.isFinalInvokeForWindow) {
console.log('Final: ', event)
const params = {
TableName,
Item: {
windowEnd: event.window.end,
windowStart: event.window.start,
sales: event.state.sales,
shardId: event.shardId
}
}
console.log({ params })
await docClient.put(params).promise()
}
console.log(JSON.stringify(event, null, 2))
// Create the state object on first invocation or use state passed in
let state = event.state
if (isEmpty (state)) {
state = {
sales: 0
}
}
console.log('Existing: ', state)
// Process records with custom aggregation logic
event.Records.map((record) => {
const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii')
const item = JSON.parse(payload).Item
// // Add sales to total
let value = parseFloat(item.sales)
console.log('Adding: ', value)
state.sales += value
})
// Return the state for the next invocation
console.log('Returning state: ', state)
return { state: state }
}
This function code processes the incoming event in the same way as the previous example. I then use this Node.js script to generate random sample order data, modified to put the data on the Kinesis stream:
const AWS = require('aws-sdk')
AWS.config.update({ region: 'us-east-1' })
const kinesis = new AWS.Kinesis()
const StreamName = 'testStream'
const ITERATIONS = 100
const SLEEP_MS = 10
let totalSales = 0
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
const createSales = async() => {
for (let i = 0; i < ITERATIONS; i++) {
let sales = Math.round (parseFloat(100 * Math.random()))
totalSales += sales
console.log ({i, sales, totalSales})
const data = {
Item: {
ID: Date.now().toString(),
sales,
timeStamp: new Date().toString()
}
}
await kinesis.putRecord({
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: 'PK1',
StreamName
}).promise()
await sleep(SLEEP_MS)
}
}
const main = async() => {
await createSales()
}
main()
Once the script is complete, the console shows the individual order transactions and the total sales:
After the tumbling window duration is finished, the second DynamoDB table shows the aggregate values calculated and stored by the Lambda function:
As there is only one shard in this Kinesis stream, there is only one aggregation value for all the data items in the test.
Conclusion
With tumbling windows, you can calculate aggregate values in near-real time for Kinesis data streams and DynamoDB streams. Unlike existing stream-based invocations, state can be passed forward by Lambda invocations. This makes it easier to calculate sums, averages, and counts on values across multiple batches of data.
In this post, I walk through an example that aggregates sales data stored in Kinesis and DynamoDB. In each case, I create an aggregation function with an event source mapping that uses the new tumbling window duration attribute. I show how state is passed between invocations and how to persist the aggregated value at the end of the tumbling window.
To learn more about how to use this feature, read the developer documentation for DynamoDB and Kinesis Streams. To learn more about building with serverless technology, visit Serverless Land.