亚马逊AWS官方博客

如何通过互联网安全地访问Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群(三)- SASL/SCRAM认证

背景

承接上文如何通过互联网安全地访问Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群(一)如何通过互联网安全地访问Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群(二),我们介绍了 Amazon MSK 为用户提供了 Public Access 公开访问的特性,以及通过 IAM 访问控制和mTLS 身份验证两种方法来进行连接,今天将介绍第三种 MSK 公开访问连接的方式 – SASL/SCRAM。

SASL/SCRAM 全称是 Simple Authentication and Security Layer/ Salted Challenge Response Mechanism,在当前的场景下可以近似理解为基于用户名和密码的方式进行身份验证。SCRAM 会使用安全的 Hash 算法来保证在验证身份的过程中,在客户端和服务器之间不需要传输密码明文信息。

通过 SASL/SCRAM 验证来访问 MSK 集群的架构图如下所示。

接下来,我们先通过以下步骤来完成Amazon MSK集群的公开访问配置和验证:

  1. 通过 Secret Store 管理用户名和密码;
  2. 创建 MSK 自定义集群配置;
  3. 创建Amazon MSK集群;
  4. 开启Public Access权限;
  5. 创建公开访问的客户端;
  6. 验证公开访问;
  7. Python 客户端验证。

通过 Secret Store 管理用户名和密码

为了使用 SASL/SCRAM 认证方式,首先需要在 Secret Store 服务创建一个新的密钥,注意选择其他类型的密钥。

然后在键值对这里根据实际情况配置 username 和 password。

因为 MSK 要求必须使用自定义的 Amazon KMS Key,所以这里点击添加新的 Key。

跳转到 KMS 服务页面后,点击创建 Key,注意这里 Key 的类型必须是 Symmetric 对成加密方式,暂不支持 Asymmetric 非对称加密方式。保持其他默认选项,点击下一步。

根据实际情况输入 Key 的别名和描述。

根据实际情况设置 Key 的管理权限。

根据实际情况设置 Key 的使用权限。

创建 Key 完成后,回到 Secret Manager 服务页面,刷新并选择刚刚创建的 Key。

注意,这里密钥的名称必须以 AmazonMSK_ 开头。

保持其他默认选项,完成创建。

 创建MSK自定义集群配置

如果使用mTLS 或者 SASL/SCRAM 身份验证的方法,必须配置启用 Kafka ACL 功能,因此这里首先创建一个自定义的集群配置。此外,我们也可以配置上auto.create.topics.enable集群选项,并设置为true(默认为false),这样可以启用在服务器上自动创建topic 便于演示,更多信息可以参考文档自定义MSK集群配置

auto.create.topics.enable=true
allow.everyone.if.no.acl.found=false

创建Amazon MSK集群

参考上文,Amazon MSK 允许用户选择 Apache Kafka 2.6.0 或更高版本的 MSK 集群开启Public Access选项,并且我们需要在创建 MSK 集群后才能启用Public Access。这里仍以us-east-1美东1区域为例进行演示。

集群配置选项选择刚刚创建的自定义配置。

要启用对集群的Public Access选项,与集群关联的子网必须是公有子网Public Subnet,这意味着子网必须具有连接 Internet 网关的关联路由表,并确保选择的安全规则组配置了合适的入站规则:

对于公开访问的MSK集群,可以参考以下集群端口信息:

  • 如果是通过TLS 加密与Broker进行通信,需要通过9194 端口公开访问;
  • 如果是通过 SASL/SCRAM Broker进行通信,需要通过9196 端口公开访问;
  • 如果是通过IAM 访问控制与Broker进行通信,需要通过9198 端口公开访问;
  • 如果是通过TLS 加密与Apache Zookeeper进行通信,需要通过2182 端口公开访问。

接下来是安全设置,要想使用Public Access选项,集群必须关闭Unauthenticated access未经身份验证的访问控制选项,并且这里在示例中我们开启 SASL/SCRAM 访问控制的选项。

如果开启Public Access选项,集群内加密默认是必选项;另外,客户端和Broker之间的明文流量也会被关闭,必须使用TLS加密的方式进行。

保持其他参数为默认选项,然后创建MSK集群。

开启Public Access权限

在集群创建完成后,点击查看集群会发现这样的提示,说明这个集群开启了 SASL/SCRAM 访问控制但还没有关联 Secret Manager 中的密钥。

所以,接下来我们进行密钥关联。

在关联密钥时,我们可以选择刚刚创建的密钥进行关联。

 

然后,在集群的网络设置里选择配置公开访问。勾选开启,同时注意这次再次提醒需要确保集群的子网具有连接 Internet 网关的关联路由表,并且安全规则组限制了访问对象来源。

创建公开访问的客户端

在等待集群公开访问配置生效的过程中,我们来创建一个示例Kafka客户端,来模拟Internet访问请求。此处参照文档创建客户端实例,并根据文档创建Topic来安装Java环境。本文使用CloudShell来模拟客户端环境,更多CloudShell的详细信息可以参考文档

以下操作是在客户端实例上进行。在示例中,我们使用JVM Truststore与MSK集群进行通信,所以我们需要对应的Truststore文件,这里在客户端实例上将Java中的证书复制到当前目录下,根据用户不同的Java安装方式和路径,这里的文件路径需要进行适当的替换。

cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.342.b07-1.amzn2.0.1.x86_64/jre/lib/security/cacerts kafka.client.truststore.jks

示例中我们以 Kafka 自带的脚本工具为例演示如何进行配置,切换到 Kafka 文件夹中,并创建名为 client_sasl.properties 的配置文件,复制以下内容到配置文件中并根据实际环境进行调整:

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/home/cloudshell-user/kafka.client.truststore.jks
java.security.auth.login.config=/home/cloudshell-user/kafka_2.12-2.6.2/users_jaas.conf

创建 JAAS 配置文件 users_jaas.conf,复制对应的用户信息:

KafkaClient {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="david"
   password="<Your-Password>";
};

设置 KAFKA_OPTS 环境变量:

export KAFKA_OPTS=-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf

验证公开访问

首先,查看MSK集群的bootstrap连接信息:

*注:公开访问启用成功后,可以到EC2控制台查看MSK集群配置的网卡信息,可以看到MSK集群对网卡进行了公网IP对应集群的域名。

然后,通过以下参考命令生产示例消息:

bin/kafka-console-producer.sh --broker-list BootstrapBroker-String --topic mytopic --producer.config client_sasl.properties

如下图所示:

这时候输入消息,会弹出报错:

ERROR [Producer clientId=console-producer] Topic authorization failed for topics [mytopic] (org.apache.kafka.clients.Metadata)

因为和 mTLS 认证一样,都需要配置 Kafka ACL 授权才能允许访问。这里需要先查看 Zookeeper 的连接信息:

因为安全原因 MSK 不允许公开访问 Zookeeper,所以这里需要能通过 VPC 连接到集群的实例进行设置。此外,这里有两种连接方式,加密和非加密的。如果使用加密信息,需要创建对应的配置文件供 kafka-acls.sh 的 –zk-tls-config-file 参数使用,如下所示:

zookeeper.ssl.client.enable=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.keystore.location=/home/cloudshell-user/kafka.client.keystore.jks
zookeeper.ssl.keystore.password=Your-Store-Pass
zookeeper.ssl.truststore.location=/home/cloudshell-user/kafka.client.truststore.jks
zookeeper.ssl.truststore.password=Your-Truststore-Pass

*注:kafka.client.truststore.jks 的默认密码是 changeit,可以通过以下命令进行修改:

keytool -keystore kafka.client.truststore.jks -storepass changeit -storepasswd -new 123456

通过以下命令设置对应的 ACL 规则,根据实际情况进行相应的替换:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ZooKeeper-Connection-String --add --allow-principal "User:david " --operation Write --topic mytopic

再次执行producer命令,可以看到能够成功发送消息:

再同时打开另一个客户端连接窗口,通过以下参考命令消费示例消息:

export KAFKA_OPTS=-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf

bin/kafka-console-consumer.sh --bootstrap-server BootstrapBroker-String --topic mytopic --consumer.config client_sasl.properties

 

会遇到以下报错信息:

[2022-10-19 08:40:54,573] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-59228
Processed a total of 0 messages

 

再补充读权限:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect= ZooKeeper-Connection-String --add --allow-principal "User:david" --operation Read --group=* --topic mytopic

再次运行可以正常查看消息:

以上,验证成功。

Python客户端验证

除了上述验证之外,我们也是用Python客户端进行基本的消息消费验证。其中,需要将上面的证书做一些转换以便在程序中使用。

首先,生成truststore的pem形式证书(默认密码为changeit,也可以先修改密码再生成):

keytool --list -rfc -keystore kafka.client.truststore.jks &gt;/tmp/truststore.pem

然后,将Private Key转换为pem形式证书:

openssl pkcs12 -nodes -in kafka.client.keystore.jks -out /tmp/private_key.pem

最后,拷贝一份ACM签名后的证书:

mv signed-certificate-from-acm /tmp/client_cert.pem

Python代码可以参考以下内容:

import boto3
import base64
import json
from kafka import KafkaConsumer
from kafka import TopicPartition

def get_secret():

    secret_name = "AmazonMSK_Sec"
    region_name = "us-east-1"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    get_secret_value_response = client.get_secret_value(SecretId=secret_name)

    if 'SecretString' in get_secret_value_response:
        secret = get_secret_value_response['SecretString']
    else:
        secret = base64.b64decode(get_secret_value_response['SecretBinary'])

    return json.loads(secret)

TOPIC = "mytopic"
secret = get_secret()
       
consumer = KafkaConsumer(bootstrap_servers=BootstrapBroker-String,
                                 security_protocol='SASL_SSL',
                                 sasl_mechanism='SCRAM-SHA-512',
                                 sasl_plain_username=secret['username'],
                                 sasl_plain_password=secret['password']
)

# Read and print all messages from test topic
parts = consumer.partitions_for_topic(TOPIC)
if parts is None:
    exit(1)
partitions = [TopicPartition(TOPIC, p) for p in parts]
consumer.assign(partitions)
for  partition in partitions:
    consumer.seek_to_end(partition)
for msg in consumer:
    print(msg)

查看消息:

以上,验证成功。

小结

本文手把手地介绍了如何配置Amazon MSK集群的Public Access选项,以及通过 SASL/SCRAM 身份验证的方式来安全地访问集群。Amazon MSK提供了三种可以公开访问集群的机制 – IAM 访问控制、SASL/SCRAM 以及 mTLS 身份验证,用户可以根据实际场景针对三种不同的认证方式进行选择并使用。

参考资料

https://docs.aws.amazon.com/msk/latest/developerguide/public-access.html

https://docs.aws.amazon.com/msk/latest/developerguide/msk-update-security.html

https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html

https://docs.aws.amazon.com/msk/latest/developerguide/msk-working-with-encryption.html

https://catalog.us-east-1.prod.workshops.aws/workshops/c2b72b6f-666b-4596-b8bc-bafa5dcca741/en-US/securityencryption/saslscram/secrets

https://aws.amazon.com/cn/blogs/china/how-to-safely-access-amazon-managed-streaming-for-apache-kafka-amazon-msk-cluster-through-the-internet-i/

本篇作者

史天

亚马逊云科技资深解决方案架构师。拥有丰富的云计算、数据分析和机器学习经验,目前致力于数据科学、机器学习、无服务器等领域的研究和实践。译有《机器学习即服务》《基于Kubernetes的DevOps实践》《Kubernetes微服务实战》《Prometheus监控实战》《云原生时代的CoreDNS学习指南》等。

齐海澎

齐海澎,亚马逊云科技资深解决方案架构师,亚马逊云科技游戏技术社区中国区负责人,有多年Devops经验,擅长各种类型游戏架构与运维解决方案,致力于Amazon GameTech的推广,在Gamelift等游戏服务上有丰富的实践经验。