AWS Big Data Blog

Amazon MSK Serverless now supports Kafka clients written in all programming languages

Amazon MSK Serverless is a cluster type for Amazon Managed Streaming for Apache Kafka (Amazon MSK) that is the most straightforward way to run Apache Kafka clusters without having to manage compute and storage capacity. With MSK Serverless, you can run your applications without having to provision, configure, or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring and moving them to even load across a cluster.

With today’s launch, MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. Administrators can simplify and standardize access control to Kafka resources using AWS Identity and Access Management (IAM). This support for IAM in Amazon MSK is based on SASL/OUATHBEARER, an open standard for authorization and authentication.

In this post, we show how you can connect your applications to MSK Serverless with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including Java, Python, Go, JavaScript, and .NET. Using IAM authentication and authorization is the preferred choice of many customers because you can secure Kafka resources just like you do with all other AWS services. Additionally, you get all the other benefits of IAM, such as temporary role-based credentials, precisely scoped permission policies, and more. Now you can use MSK Serverless with IAM-based authentication more broadly with the support for multiple languages.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. For example, you can create an IAM user and a policy that allows the user to write to a specific Kafka topic but restricts access to other resources without worrying about managing Kafka ACLs. After you provide the identity policies with the necessary permissions, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. With OAUTHBEARER support, you can build clients that can work across both Amazon MSK and other Kafka environments. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, Amazon MSK provides new code libraries for the following programming languages in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the bootstrap address along with its request to access Apache Kafka resources.
  3. The MSK Serverless cluster decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK Serverless clusters with no additional cost. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. You also need to make sure the security group associated with your MSK Serverless cluster has an inbound rule allowing the traffic from the client applications in the associated VPCs to the TCP port number 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

For this post, we use the Python programming language. We also use https://github.com/dpkp/kafka-python as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for Python with your Kafka client library, run the following command:
    $ pip install aws-msk-iam-sasl-signer-python
  2. Import the installed Amazon MSK IAM SASL signer library in your code:
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import socket
    import time
    from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    class MSKTokenProvider():
            def token(self):
                token, _ = MSKAuthTokenProvider.generate_auth_token('<your aws region>')
                return token
  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your Python Kafka client properties, and pass the token provider in the configuration object:
    tp = MSKTokenProvider()
    
        producer = KafkaProducer(
            bootstrap_servers='<Amazon MSK Serverless bootstrap string>',
            security_protocol='SASL_SSL',
            sasl_mechanism='OAUTHBEARER',
            sasl_oauth_token_provider=tp,
            client_id=”my.kafka.client.unique.id”,
        )

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. You can run your applications without having to configure and manage the infrastructure or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring, and ensures an even balance of partition distribution across brokers in the cluster (auto-balancing).

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.