亚马逊AWS官方博客

基于亚马逊云科技托管 Flink 的开发系列 — SSL 认证的 Kafka 读取篇

1. 概述

上文讲述了如何建立在 Apache Flink 中将数据写入到 Amazon S3。这篇文章将继续讲述 Flink 开发的一个场景:读取采用 SSL 认证的 Apache Kafka 数据并将其写入 Amazon S3。这个场景是来自真实客户的需求。

2. 采用 SSL 认证的 Apache Kafka

Apache Kafka 是一个支持多分区、多副本的分布式消息流平台,同时也是一款开源的基于发布订阅模式的消息引擎系统。在大数据框架中,Kafka 经常承担着接收前端数据的功能,比如数据库的日志、应用打点日志、IoT 设备的数据等,Flink 就从 Kafka 中读取数据,进行数据处理,再往后端写入。

默认情况下,Apache Kafka 以明文形式发送所有数据,且不进行任何身份认证,但在企业实际应用中不会建议这么做。Kafka 支持基于 SSL 和基于 SASL 的安全认证机制。基于 SSL 的认证主要是指 Broker 和客户端的双路认证(2-way authentication)。通常来说,SSL 加密(Encryption)已经启用了单向认证,即客户端认证 Broker 的证书(Certificate)。如果要做 SSL 认证,那么我们要启用双路认证,也就是说 Broker 也要认证客户端的证书。限于篇幅,本文不再赘述SASL的认证方式。

在 Kafka 的 SSL 设置中,使用 Keystore 和 Truststore 保存证书和密钥。每个 Broker 都需要自己的 Keystore,其中包含私钥和公共证书。客户端使用其 Truststore 来验证该证书并信任服务器。同样,每个客户端也需要自己的 Keystore,其中包含私钥和公共证书。服务器使用其 Truststore 来验证和信任客户端的证书,并建立安全连接。

Truststore 可以包含一个可以签署证书的证书颁发机构(CA)。在这种情况下,Broker 或客户端会信任由 Truststore 中的 CA 签发的任何证书。这就简化了证书验证,因为添加新客户端或 Broker 无需更改 Truststore。

Flink 在连接 Kafka 时是作为一个客户端,所以需要使用相应的 Truststore 和 Keystore 文件。但在亚马逊云科技托管 Flink 的环境下,我们无法将这两个文件预先拷贝到各个节点上,这时我们可以使用下面的亚马逊云科技开源的 Apache Kafka Config Providers 来实现这个功能。

3. Apache Kafka Config Providers

Apache Kafka Config Providers 允许您在 Kafka Connect 或 Flink 程序中读取特定的配置,这些配置可以将密码用变量来表示,程序在运行时系统会解析这些变量。这样可以防止凭证和其他密钥以明文形式存储。Apache Kafka Config Providers 提供程序支持从 Amazon Secrets Manager、Amazon S3 和 Systems Manager (SSM) 检索配置参数,对于 S3 上的文件,会在运行时拷贝到每个工作节点上。关于该组件的代码可以参考附录链接[3]。

下面我们将先做些准备工作以使用该组件。

首先需要在 Amazon Secrets Manager 里面创建一个名为 kafak_ssl_credential 的密钥,设置 Truststore 和 Keystore 的密码:

其次,将 Truststore 和 Keystore 的 JKS 文件上传到 Amazon S3 上。

4. 制作 Fat Jar 文件

由于亚马逊云科技托管 Flink 的环境下无法把所有依赖的 Jar 文件直接部署到集群中,那就需要我们把所有依赖的 Jar 文件和程序生成的类(如果是 Java 或 Scala 代码)一起打包成一个 Fat Jar 文件。详细内容可以参考附录链接[4]。

  • 安装 Java 11 的 JDK 和 Maven 3.x
    (base) [ec2-user@ip-172-31-2-2 src]$ mvn -version
    Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
    Maven home: /home/ec2-user/Work/maven/apache-maven-3.8.6
    Java version: 11.0.16, vendor: Amazon.com Inc., runtime: /home/ec2-user/Downloads/amazon-corretto-11.0.16.8.1-linux-x64
    Default locale: en_US, platform encoding: UTF-8
    OS name: "linux", version: "4.14.287-215.504.amzn2.x86_64", arch: "amd64", family: "unix"
    
  • 克隆代码
  • 编辑 pom.xml 文件,将需要的 jar 包都放入到 dependencies 中

我们将使用五个 Jar 包,分别是 Apache Kafka Config Providers,Flink Kafka connector,Kafka Client 以及 fasterxml jaskson 的两个包。

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>msk-config-providers</artifactId>
    <version>0.2.0-all</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-parquet -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.8.10</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.8.10</version>
</dependency>
  • 运行 mvn clean package

如果一切顺利,会在 target 目录下生成 Fat Jar 文件: FatJarMaker-1.0-SNAPSHOT-combined.jar。如果你想要一个有意义的名字,可以在编译前修改 pom.xml 文件中 artifactId 栏目。

5. 开始测试程序

现在我们可以开始演示如何读取 Kafka 数据,然后写入 Amazon S3。其中有关本地程序写入 S3 的特殊设置,可以参考本系列上一篇文章的设置。由于之前亚马逊云科技的官方示例程序中没有包含相关内容,所以以下代码可以从附录链接[1]中取得,可以使用 git 命令将代码克隆到本地。

5.1 生成源数据

我们这里已经有一个使用 SSL 认证的 Apache Kafka 环境,同时拿到了对应的 Keystore 和 Truststore 文件。我们基于之前的 stock.py 生成股票价格的程序,编写了 stock-kafka.py 来将数据写入到 Kafka。因为 Python 的 Kafka 客户端不能直接使用 JKS 格式的 Keystore 和 Truststore,所以可以参考附录链接[2]的方法转换成 PEM 格式的证书。

然后我们运行 stock-kafka.py 来产生模拟股票价格的测试数据。

5.2 准备相关 Java 包

本次演示程序在 KafkaSSLConnectorFileSink 目录中,我们已经把上文生成的 FatJarMaker-1.0-SNAPSHOT-combined.jar 文件放到了 lib 目录中。

5.3 Apache Kafka Config Providers 配置

在本次程序中,我们使用到了 Apache Kafka Config Providers 的 secretmanager 和 s3import 两个功能,secretmanager 是读取存放在 Secrets Manager 中 Truststore 和 Keystore 的密码,s3import 是用来拷贝存放在 S3 上 Truststore 和 Keystore 文件到各个 Worker 节点上。

5.4 更新应用属性

修改 KafkaSSLConnectorFileSink/application_properties.json 文件中配置,包括 Kafka 服务器地址、Trustore 和 Keystore 的 S3 路径,最后文件输出的 S3 桶名。

5.5 修改运行配置

在 kafka-ssl-file-sink.py 文件上右键选择 Modify Run Configuration…

在 Run 下面的 interpreter 中选择之前配置的 flink-env,在 Environment variables 中增加两个变量,中间用分号相隔,其中 HADOOP_CONF_DIR 后面路径即是 PyFlink 安装目录的 conf 路径,请按照实际目录修改:

IS_LOCAL=true;HADOOP_CONF_DIR=/home/ec2-user/miniconda3/envs/flink-env/lib/python3.8/site-packages/pyflink/conf

最后如果配置了额外的 Amazon Command Line Interface (Amazon CLI) profile,要选择相应的 profile。

5.6 运行程序

现在我们就可以开始运行程序了,在右上角的运行栏中选择 kafka-ssl-file-sink.py,点击右边绿色三角形按钮就可以开始运行程序了。

如果一切顺利,下方 Run 框里面没有错误信息,并且可以看到 S3 插件的相关信息:

5.7 验证结果

运行一段时间后,转到亚马逊云科技的控制台,选择 S3 服务,在之前设置的 Bucket 里面,可以看到有 JSON 文件生成,这就是 Flink 写入成功了。这些文件是安装 ticker 来分区存放。

5.8 特别说明

Flink Task 的 Parallelism 数目默认是当前 CPU 核数,而 Kafka 的每个 Topic 都有不同的分区数目。如果 Parallelism 如果大于分区数,有些 Task 就会没有分配到分区,会导致 Flink 无法等到 Watermark 结束标志,从而无法进行窗口计算,一直产生不了 S3 文件,所以我们在代码中设置了缺省的 Parallelism 为 1,后续生成上可以根据实际需求设置。

另外一种方法是设置 table.exec.source.idle-timeout,强制在一定时间后进行窗口计算,具体可以参考 Flink 官方文档[5]。

6. 部署上云

部署上云时不用考虑之前 S3 的 endpoint 地址问题,因为托管的 Flink 中已经做好相应的配置,直接按照之前文章的方式打包部署就可以了。

同时,为了顺利访问 Secrets Manager 和 S3 上 Truststore 和 Keystore 文件,请赋予托管 Flink 所使用的 Role 相应的权限。

7. 结束语

本文演示了 Apache Flink 读取需要 SSL 证书访问 Apache Kafka 的数据,再写入到 Amazon S3 的过程,特别说明了如何使用 Apache Kafka Config Providers 这个组件来进行证书和密码的存储与使用。

下一篇,我们将会讲述在 Flink 中读取 MySQL binlog,然后写入到 Apache Hudi 的数据湖中,敬请期待。

参考链接

[1] https://github.com/xmubeta/pyflink-getting-started

[2] https://dev.to/adityakanekar/connecting-to-kafka-cluster-using-ssl-with-python-k2e

[3] https://github.com/aws-samples/msk-config-providers

[4] https://github.com/shankarps/

[5] https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/config/#table-exec-source-idle-timeouts

本篇作者

周平

西云数据高级技术客户经理,致力于大数据技术的研究和落地,为亚马逊云科技中国客户提供企业级架构和技术支持。