AWS Compute Blog

Using Amazon MSK as an event source for AWS Lambda

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available service that uses Apache Kafka to process real-time streaming data. Many producers can send messages to Kafka, which can then be routed to and processed by multiple consumers. Lambda now supports Amazon MSK as an event source, so it can consume messages and integrate with downstream serverless workflows.

Apache Kafka is a distributed streaming platform that it is similar to Amazon Kinesis. Amazon MSK simplifies the setup, scaling, and management of clusters running Kafka. It makes it easier to configure the application for multiple Availability Zones and securing with IAM. It’s fully compatible with Kafka and supports familiar community-build tools such as MirrorMakerApache Flink, and Prometheus.

In this blog post, I explain how to set up a test Amazon MSK cluster and configure key elements in the networking configuration. I also show how to create a Lambda function that is invoked by messages in Amazon MSK topics.

Overview

Using Amazon MSK as an event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

Lambda is a consumer application for your Kafka topic. It processes records from one or more partitions and sends the payload to the target function. Lambda continues to process batches until there are no more messages in the topic.

The Lambda function’s event payload contains an array of records. Each array item contains details of the topic and Kafka partition identifier, together with a timestamp and base64 encoded message:

MSK Lambda event payload

Network configuration overview

Amazon MSK is a highly available service, so it must be configured to run in a minimum of two Availability Zones in your preferred Region. To comply with security best practice, the brokers are usually configured in private subnets in each Region.

For Amazon MSK to invoke Lambda, you must ensure that there is a NAT Gateway running in the public subnet of each Region. It’s possible to route the traffic to a single NAT Gateway in one AZ for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT Gateway available in each Availability Zone.

Amazon MSK network architecture

The Lambda function target in the event source mapping does not need to be running in a VPC to receive messages from Amazon MSK. To learn more about configuring the private subnet table to use a NAT Gateway, see this Premium Support response. For an example of how to configure the infrastructure with AWS CloudFormation, see this template.

Required Lambda function permissions

The Lambda function must have permission to describe VPCs and security groups, and manage elastic network interfaces. These execution roles permissions are:

  • ec2:CreateNetworkInterface
  • ec2:DescribeNetworkInterfaces
  • ec2:DescribeVpcs
  • ec2:DeleteNetworkInterface
  • ec2:DescribeSubnets
  • ec2:DescribeSecurityGroups

To access the Amazon MSK data stream, the Lambda function also needs two Kafka permissions: kafka:DescribeCluster and kafka:GetBootstrapBrokers. The policy template AWSLambdaMSKExecutionRole includes these permissions.

In AWS SAM templates, you can configure a Lambda function with an Amazon MSK event source mapping and the necessary permissions. For example:

Resources:
  ProcessMSKfunction:
    Type: AWS::Serverless::Function 
    Properties:
      CodeUri: code/
      Timeout: 3
      Handler: app.handler
      Runtime: nodejs12.x
      Policies:
      - AWSLambdaMSKExecutionRole
      Events:
        MSKEvent:
          Type: MSK
          Properties:
            StartingPosition: LATEST
            Stream: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/12abcd12-1234-1234-1234-1234abcd1234-1
            Topics:
            - MyTopic

Setting up an Amazon MSK cluster

Before starting, configure a new VPC with public and private subnets in two Availability Zones using this AWS CloudFormation template. In this configuration, the private subnets are set up to use a NAT Gateway.

To create the Amazon MSK cluster:

  1. From the Amazon MSK console, choose Create Cluster.
  2. Select Create cluster with custom settings, apply the name “my-MSK-cluster”, and keep the recommended Apache Kafka version.Create MSK cluster
  3. In the Configuration panel, keep the “Use the MSK default configuration” option selected.MSK configuration dialog box
  4. In Networking, select the new VPC and choose “2” for Number of Availability Zones. From the dropdowns, select the two Availability Zones in the VPC, and choose the private subnets for each.MSK networking configuration dialog box
  5. In the Brokers panel, choose kafka.t3.small for Broker instance type, and enter “1” for Number of brokers per Availability Zone.Brokers dialog box
  6. For Storage, enter “1” for EBS storage volume per broker.
  7. Keep the existing defaults and choose Create cluster. It takes up to 15 minutes to create the cluster and the status is displayed in the Cluster Summary panel.Creating MSK cluster summary

Configuring the Lambda event source mapping

You can create the Lambda event source mapping using the AWS CLI or AWS SDK, which provide the CreateEventSourceMapping API. In this walkthrough, you use the AWS Management Console to create the event source mapping.

First, you must create an Amazon MSK topic:

  1. Launch an EC2 instance, selecting t2.micro as the instance size. Connect to the instance once it is available.
  2. Follow these instructions in the Amazon MSK documentation to create a topic. Use the name “mytopic” and a replication-factor of 2, since there are only two Availability Zones in this test.

Now create a Lambda function that uses the Amazon MSK cluster and topic as an event source:

  1. From the Lambda console, select Create function.
  2. Enter a function name, and select Node.js 12.x as the runtime.
  3. Select the Permissions tab, and select the role name in the Execution role panel to open the IAM console.
  4. Choose Attach policies.Attach policies to IAM role
  5. In the filter box, enter “MSK”, then check the AWSLambdaMSKExecutionRole policy name.AWSLambdaMSKExecutionRole role
  6. Choose Attach policy.IAM role summary
  7. Back in the Lambda function, select the Configuration tab. In the Designer panel, choose Add trigger.
  8. In the dropdown, select MSK. In the MSK cluster box, select the cluster you configured earlier. Enter “mytopic” for Topic name and choose “Latest” for Starting Position. Choose Add.Add trigger
    Note: it takes several minutes for the trigger status to update from Creating to Enabled.
  9. In the Function code panel, replace the contents of index.js with:
    exports.handler = async (event) => {
        // Iterate through keys
        for (let key in event.records) {
          console.log('Key: ', key)
          // Iterate through records
          event.records[key].map((record) => {
            console.log('Record: ', record)
            // Decode base64
            const msg = Buffer.from(record.value, 'base64').toString()
            console.log('Message:', msg)
          }) 
        }
    }
  10. Choose Save.

Testing the event source mapping

At this point, you have created a VPC with two private and public subnets and a NAT Gateway. You have placed a new Amazon MSK cluster in the two private subnets. You set up a target Lambda function in the same VPC and private subnets, with the necessary IAM permissions. Next, you publish messages to the Amazon MSK topic and see the resulting invocation in the Lambda function’s logs.

  1. From the EC2 instance you created earlier, follow these instructions to produce messages. In the Kafka console producer script running in the terminal, enter “Message #1”:Produce Kafka messages from terminal
  2. Back in the Lambda function, select the Monitoring tab and choose View logs in CloudWatch. Select the first log stream.CloudWatch log streams
  3. In the Log events panel, expand the entries to see the message sent from the Amazon MSK topic. Note that the value attribute containing the message is base64 encoded.MSK event log in Lambda

Conclusion

Amazon MSK provide a fully managed, highly available service that uses Kafka to process real-time streaming data. Now Lambda supports Amazon MSK as an event source, you can invoke Lambda functions from messages in Kafka topics to integrate into your downstream serverless workflows.

In this post, I give an overview of how the integration compared with other event source mappings. I show how to create a test Amazon MSK cluster, configure the networking, and create the event source mapping with Lambda. I also show how to set up the Lambda function in the AWS Management Console, and refer to the equivalent AWS SAM syntax to simplify deployment.

To learn more about how to use this feature, read the documentation.