AWS Big Data Blog

AWS Glue mutual TLS authentication for Amazon MSK

In today’s landscape, data streams continuously from countless sources such as social media interactions to Internet of Things (IoT) device readings. This torrent of real-time information presents both a challenge and an opportunity for businesses. To harness the power of this data effectively, organizations need robust systems for ingesting, processing, and analyzing streaming data at scale. Enter Apache Kafka: a distributed streaming platform that has revolutionized how companies handle real-time data pipelines and build responsive, event-driven applications. AWS Glue is used to process and analyze large volumes of real-time data and perform complex transformations on the streaming data from Apache Kafka.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed Apache Kafka service. You can activate a combination of authentication modes on new or existing MSK clusters. The supported authentication modes are AWS Identity and Access Management (IAM) access control, mutual Transport Layer Security (TLS), and Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM). For more information about using IAM authentication, refer to Securely process near-real-time data from Amazon MSK Serverless using an AWS Glue streaming ETL job with IAM authentication.

Mutual TLS authentication requires both the server and the client to present certificates to prove their identity. It’s ideal for hybrid applications that need a common authentication model. It’s also a commonly used authentication mechanism for business-to-business applications and is used in standards such as open banking, which enables secure open API integrations for financial institutions. For Amazon MSK, AWS Private Certificate Authority (AWS Private CA) is used to issue the X.509 certificates and for authenticating clients.

This post describes how to set up AWS Glue jobs to produce, consume, and process messages on an MSK cluster using mutual TLS authentication. AWS Glue will automatically infer the schema from the streaming data and store the metadata in the AWS Glue Data Catalog for analysis using analytics tools such as Amazon Athena.

Example use case

In our example use case, a hospital facility regularly monitors the body temperatures for patients admitted in the emergency ward using smart thermometers. Each device automatically records the patients’ temperature readings and posts the records to a central monitoring application API. Each posted record is a JSON formatted message that contains the deviceId that uniquely identifies the thermometer, a patientId to identify the patient, the patient’s temperature reading, and the eventTime when the temperature was recorded.

Record schema

The central monitoring application checks the hourly average temperature readings for each patient and notifies the hospital’s healthcare workers when a patient’s average temperature exceeds accepted thresholds (36.1–37.2°C). In our case, we use the Athena console to analyze the readings.

Overview of the solution

In this post, we use an AWS Glue Python shell job to simulate incoming data from the hospital thermometers. This job produces messages that are securely written to an MSK cluster using mutual TLS authentication.

To process the streaming data from the MSK cluster, we deploy an AWS Glue Streaming extract, transform, and load (ETL) job. This job automatically infers the schema from the incoming data, stores the schema metadata in the Data Catalog, and then stores the processed data as efficient Parquet files in Amazon Simple Storage Service (Amazon S3). We use Athena to query the output table in the Data Catalog and uncover insights.

The following diagram illustrates the architecture of the solution.

Solution architecture

The solution workflow consists of the following steps:

  1. Create a private certificate authority (CA) using AWS Certificate Manager (ACM).
  2. Set up an MSK cluster with mutual TLS authentication.
  3. Create a Java keystore (JKS) file and generate a client certificate and private key.
  4. Create a Kafka connection in AWS Glue.
  5. Create a Python shell job in AWS Glue to create a topic and push messages to Kafka.
  6. Create an AWS Glue Streaming job to consume and process the messages.
  7. Analyze the processed data in Athena.

Prerequisites

You should have the following prerequisites:

Cloud Formation stack set

This template creates two NAT gateways as shown in the following diagram. However, it’s possible to route the traffic to a single NAT gateway in one Availability Zone for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT gateway available in each Availability Zone.

VPC setup

The stack also creates a security group with a self-referencing rule to allow communication between AWS Glue components.

Create a private CA using ACM

Complete the following steps to create a root CA. For more details, refer to Creating a private CA.

  1. On the AWS Private CA console, choose Create a private CA.
  2. For Mode options, select either General-purpose or Short-lived certificate for lower pricing.
  3. For CA type options, select Root.
  4. Provide certificate details by providing at least one distinguished name.

Create private CA

  1. Leave the remaining default options and select the acknowledge checkbox.
  2. Choose Create CA.
  3. On the Actions menu, choose Install CA certificate and choose Confirm and install.

Install certificate

Set up an MSK cluster with mutual TLS authentication

Before setting up the MSK cluster, make sure you have a VPC with at least two private subnets in different Availability Zones and a NAT gateway with a route to the internet. A CloudFormation template is provided in the prerequisites section.

Complete the following steps to set up your cluster:

  1. On the Amazon MSK console, choose Create cluster.
  2. For Creation method, Custom create.
  3. For Cluster type, select Provisioned.
  4. For Broker size, you can choose kafka.t3.small for the purpose of this post.
  5. For Number of zones, choose 2.
  6. Choose Next.
  7. In the Networking section, select the VPC, private subnets, and security group you created in the prerequisites section.
  8. In the Security settings section, under Access control methods, select TLS client authentication through AWS Certificate Manager (ACM).
  9. For AWS Private CAs, choose the AWS private CA you created earlier.

The MSK cluster creation can take up to 30 minutes to complete.

Create a JKS file and generate a client certificate and private key

Using the root CA, you generate client certificates to use for authentication. The following instructions are for CloudShell, but can also be adapted for a client machine with Java and the AWS CLI installed.

  1. Open a new CloudShell session and run the following commands to create the certs directory and install Java:
mkdir certs
cd certs
sudo yum -y install java-11-amazon-corretto-headless
  1. Run the following command to create a keystore file with a private key in JKS format. Replace Distinguished-NameExample-AliasYour-Store-Pass, and Your-Key-Pass with strings of your choice:

keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass Your-Store-Pass -keypass Your-Key-Pass -dname "CN=Distinguished-Name" -alias Example-Alias -storetype pkcs12

  1. Generate a certificate signing request (CSR) with the private key created in the preceding step:

keytool -keystore kafka.client.keystore.jks -certreq -file csr.pem -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass

  1. Run the following command to remove the word NEW (and the single space that follows it) from the beginning and end of the file:

sed -i -E '1,$ s/NEW //' csr.pem

The file should start with -----BEGIN CERTIFICATE REQUEST----- and end with -----END CERTIFICATE REQUEST-----

  1. Using the CSR file, create a client certificate using the following command. Replace Private-CA-ARN with the ARN of the private CA you created.

aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://csr.pem --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS"

The command should print out the ARN of the issued certificate. Save the CertificateArn value for use in the next step.

{
"CertificateArn": "arn:aws:acm-pca:region:account:certificate-authority/CA_ID/certificate/certificate_ID"
}
  1. Use the Private-CA-ARN together with the CertificateArn (arn:aws:acp-pca:<region>:...) generated in the preceding step to retrieve the signed client certificate. This will create a client-cert.pem file.

aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN | jq -r '.Certificate + "\n" + .CertificateChain' >> client-cert.pem

  1. Add the certificate into the Java keystore so you can present it when you talk to the MSK brokers:

keytool -keystore kafka.client.keystore.jks -import -file client-cert.pem -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass -noprompt

  1. Extract the private key from the JKS file. Provide the same destkeypass and deststorepass and enter the keystore password when prompted.

keytool -importkeystore -srckeystore kafka.client.keystore.jks -destkeystore keystore.p12 -srcalias Example-Alias -deststorepass Your-Store-Pass -destkeypass Your-Key-Pass -deststoretype PKCS12

  1. Convert the private key to PEM format. Enter the keystore password you provided in the previous step when prompted.

openssl pkcs12 -in keystore.p12 -nodes -nocerts -out private-key.pem

  1. Remove the lines that begin with Bag Attributes.. from the top of the file:

sed -i -ne '/-BEGIN PRIVATE KEY-/,/-END PRIVATE KEY-/p' private-key.pem

  1. Upload the client-cert.pem, client.keystore.jks, and private-key.pem files to Amazon S3. You can either create a new S3 bucket or use an existing bucket to store the following objects. Replace <s3://aws-glue-assets-11111111222222-us-east-1/certs/> with your S3 location.

aws s3 sync ~/certs s3://aws-glue-assets-11111111222222-us-east-1/certs/ --exclude '*' --include 'client-cert.pem' --include 'private-key.pem' --include 'kafka.client.keystore.jks'

Create a Kafka connection in AWS Glue

Complete the following steps to create a Kafka connection:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. Select Apache Kafka and choose Next.
  4. For Amazon Managed Streaming for Apache Kafka Cluster, choose the MSK cluster you created earlier.

Create Glue Kafka connection

  1. Choose TLS client authentication for Authentication method.
  2. Enter the S3 path to the keystore you created earlier and provide the keystore and client key passwords you used for the -storepass and -keypass

Add authentication method to connection

  1. Under Networking options, choose your VPC, a private subnet, and a security group. The security group should contain a self-referencing rule.
  2. On the next page, provide a name for the connection (for example, Kafka-connection) and choose Create connection.

Create a Python shell job in AWS Glue to create a topic and push messages to Kafka

In this section, you create a Python shell job to create a new Kafka topic and push JSON messages to the topic. Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs.
  2. In the Script section, for Engine, choose Python shell.
  3. Choose Create script.

Create Python shell job

  1. Enter the following script in the editor:
import sys
from awsglue.utils import getResolvedOptions
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka.errors import TopicAlreadyExistsError
from urllib.parse import urlparse

import json
import uuid
import datetime
import boto3
import time
import random

# Fetch job parameters
args = getResolvedOptions(sys.argv, ['connection-names', 'client-cert', 'private-key'])

# Download client certificate and private key files from S3
TOPIC = 'example_topic'
client_cert = urlparse(args['client_cert'])
private_key = urlparse(args['private_key'])

s3 = boto3.client('s3')
s3.download_file(client_cert.netloc, client_cert.path.lstrip('/'),  client_cert.path.split('/')[-1])
s3.download_file(private_key.netloc, private_key.path.lstrip('/'),  private_key.path.split('/')[-1])

# Fetch bootstrap servers from connection
args = getResolvedOptions(sys.argv, ['connection-names'])
if ',' in args['connection_names']:
    raise ValueError("Choose only one connection name in the job details tab!")
glue_client = boto3.client('glue')
response = glue_client.get_connection(Name=args['connection_names'], HidePassword=True)
bootstrapServers = response['Connection']['ConnectionProperties']['KAFKA_BOOTSTRAP_SERVERS']

# Create topic and push messages 
admin_client = KafkaAdminClient(bootstrap_servers= bootstrapServers, security_protocol= 'SSL', ssl_certfile= client_cert.path.split('/')[-1], ssl_keyfile= private_key.path.split('/')[-1])
try:
    admin_client.create_topics(new_topics=[NewTopic(name=TOPIC, num_partitions=1, replication_factor=1)], validate_only=False)
except TopicAlreadyExistsError:
    # Topic already exists
    pass
admin_client.close()

# Generate JSON messages for the new topic
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers=bootstrapServers, security_protocol='SSL', 
                         ssl_check_hostname=True, ssl_certfile= client_cert.path.split('/')[-1], ssl_keyfile= private_key.path.split('/')[-1])
                         
for i in range(1200):
    _event = {
        "deviceId": str(uuid.uuid4()),
        "patientId": "PI" + str(random.randint(1,15)).rjust(5, '0'),
        "temperature": round(random.uniform(32.1, 40.9), 1),
        "eventTime": str(datetime.datetime.now())
    }
    producer.send(TOPIC, _event)
    time.sleep(3)
    
producer.close()
  1. On the Job details tab, provide a name for your job, such as Kafka-msk-producer.
  2. Choose an IAM role. If you don’t have one, create one following the instructions in Configuring IAM permissions for AWS Glue.
  3. Under Advanced properties, for Connections, choose the Kafka-connection connection you created.
  4. Under Job parameters, add the following parameters and values:
    1. Key: --additional-python-modules, value: kafka-python.
    2. Key: --client-cert, value: s3://aws-glue-assets-11111111222222-us-east-1/certs/client-cert.pem. Replace with your client-cert.pem Amazon S3 location from earlier.
    3. Key: --private-key, value: s3://aws-glue-assets-11111111222222-us-east-1/certs/private-key.pem. Replace with your private-key.pem Amazon S3 location from earlier.
      AWS Glue Job parameters
  5. Save and run the job.

You can confirm that the job run status is Running on the Runs tab.

At this point, we have successfully created a Python shell job to simulate the thermometers sending temperature readings to the monitoring application. The job will run for approximately 1 hour and push 1,200 records to Amazon MSK.

Alternatively, you can replace the Python shell job with a Scala ETL job to act as a producer to send messages to the MSK cluster. In this case, use the JKS file for authentication using ssl.keystore.type=JKS. If you’re using PEM format keys, the current version of Kafka clients libraries (2.4.1) installed in AWS Glue version 4 don’t yet support authentication through certificates in PEM format (as of this writing).

Create an AWS Glue Streaming job to consume and process the messages

You can now create an AWS Glue ETL job to consume and process the messages in the Kafka topic. AWS Glue will automatically infer the schema from the files. Complete the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. Choose Visual ETL to author a new job.
  3. For Sources, choose Apache Kafka.
  4. For Connection name, choose the node and connection name you created earlier.
  5. For Topic name, enter the topic name (example_topic) you created earlier.
  6. Leave the rest of the options as default.

Kafka data source

  1. Add a new target node called Amazon S3 to store the output Parquet files generated from the streaming data.
  2. Choose Parquet as the data format and provide an S3 output location for the generated files.
  3. Select the option to allow AWS Glue to create a table in the Data Catalog and provide the database and table names.

S3 Output node

  1. On the job details tab, provide the following options:
    1. For the requested number of workers, enter 2.
    2. For IAM Role, choose an IAM role with permissions to read and write to the S3 output location.
    3. For Job timeout, enter 60 (for the job to stop after 60 minutes).
    4. Under Advanced properties, for Connections, choose the connection you created.
  2. Save and run the job.

You can confirm the S3 output location for new Parquet files created under the prefixes s3://<output-location>/ingest_year=XXXX/ingest_month=XX/ingest_day=XX/ingest_hour=XX/.

At this point, you have created a streaming job to process events from Amazon MSK and store the JSON formatted records as Parquet files in Amazon S3. AWS Glue streaming jobs are meant to be running continuously to process streaming data. We have set the timeout to stop the job after 60 minutes. You can also stop the job manually after the records have been processed to Amazon S3.

Analyze the data in Athena

Going back to our example use case, you can run the following query in Athena to monitor and track the hourly average temperature readings for patients that exceed the normal thresholds (36.1–37.2°C):

SELECT
date_format(parse_datetime(eventTime, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), '%h %p') hour,
patientId,
round(avg(temperature), 1) average_temperature,
count(temperature) readings
FROM "default"."devices_data"
GROUP BY 1, 2
HAVING avg(temperature) > 37.2 or avg(temperature) < 36.1
ORDER BY 2, 1 DESC

Amazon Athena Console

Run the query multiple times and observe how the average_temperature and the number of readings changes with new incoming data from the AWS Glue Streaming job. In our example scenario, healthcare workers can use this information to identify patients who are experiencing consistent high or low body temperatures and give the required attention.

At this point, we have successfully created and ingested streaming data to our MSK cluster using mutual TLS authentication. We only needed the certificates generated by AWS Private CA to authenticate our AWS Glue clients to the MSK cluster and process the streaming data with an AWS Glue Streaming job. Finally, we used Athena to visualize the data and observed how the data changes in near real time.

Clean up

To clean up the resources created in this post, complete the following steps:

  1. Delete the private CA you created.
  2. Delete the MSK cluster you created.
  3. Delete the AWS Glue connection you created.
  4. Stop the jobs if they are still running and delete the jobs you created.
  5. If you used the CloudFormation stack provided in the prerequisites, delete the CloudFormation stack to delete the VPC and other networking components.

Conclusion

This post demonstrated how you can use AWS Glue to consume, process, and store streaming data for Amazon MSK using mutual TLS authentication. AWS Glue Streaming automatically infers the schema and creates a table in the Data Catalog. You can then query the table using other data analysis tools like Athena, Amazon Redshift, and Amazon QuickSight to provide insights into the streaming data.

Try out the solution for yourself, and let us know your questions and feedback in the comments section.


About the Authors

Edward Okemwa OndariEdward Okemwa is a Big Data Cloud Support Engineer (ETL) at AWS Nairobi specializing in AWS Glue and Amazon Athena. He is dedicated to providing customers with technical guidance and resolving issues related to processing and analyzing large volumes of data. In his free time, he enjoys singing choral music and playing football.

Edward Okemwa OndariEmmanuel Mashandudze is a Senior Big Data Cloud Engineer specializing in AWS Glue. He collaborates with product teams to help customers efficiently transform data in the cloud. He helps customers design and implements robust data pipelines. Outside of work, Emmanuel is an avid marathon runner, sports enthusiast and enjoys creating memories with his family.