亚马逊AWS官方博客
基于亚马逊云科技托管 Flink 的开发系列 — 本地开发环境篇
1. 概述
随着 2023 年 8 月亚马逊云科技把原来 Amazon Kinesis Data Analytics 更名为 Amazon Managed Service for Apache Flink,这一改变凸显了 Flink 在流式数据处理方面的领导地位。Flink 拥有高效的流式数据处理引擎,在性能、事件时间处理、故障处理等技术上具有领先优势。Flink 已被众多知名企业应用在搜索推荐、实时分析、欺诈检测等关键场景中,具有良好的实践验证。
本系列将以一个从未接触 Flink 的用户实际出发,讲述从本地开发,部署,以及解决一些特定问题,希望能帮助到有兴趣开发 PyFlink(即 Flink-Python)的应用,并部署到 Amazon Managed Service for Apache Flink 的用户(本文以亚马逊云科技中国北京区域为例)。
2. Amazon Managed Service for Apache Flink 简介
Amazon Managed Service for Apache Flink 是一项托管的服务,用户无需管理基础设施。通过自动扩缩容,优化资源使用,节省成本。同时确保高可用,自动检测并重启失败的 Flink 组件。另外使用了官方的 Apache Flink 开源代码,可以实现无缝迁移。
Amazon Managed Service for Apache Flink 支持使用 Java,Scala 和 Python 语言编写的应用,考虑客户擅长 Python,本文将以 Python 为例。
3. 本地开发环境设置
日常开发 Flink 程序时,可以在本地先调试完成后,再部署到托管的 Flink 上,这样既提高了开发效率,也可以节省开发成本。
本系列以亚马逊云科技官方案例 https://github.com/aws-samples/pyflink-getting-started 为基础,针对中国区(北京区域和宁夏区域)增加了特别配置来顺利执行写入 S3 的操作,以及如何制作 uber jar,连接 TLS 认证的 Apache Kafka(在后续文章中)。在实际验证过程中,笔者发现在 Windows 平台上部署时,在使用 UDF 情况下会出现 Failed to create stage bundle factory 的错误,研究许久后也无法解决,所以如果需要用到 UDF 的话,建议使用 Linux 或者 MacOS。下文将以 Amazon Linux 2 为例,使用 MacOS 的同学可以参考 github 上原文。该 Linux 基于 Amazon Linux 2 + Mate Deskstop,具体可以参考 https://docs.amazonaws.cn/en_us/AWSEC2/latest/UserGuide/amazon-linux-ami-mate.html。
3.1 开发工具安装
虽然众多工具都可以用来开发 PyFlink 的程序,但是笔者还是会推荐使用 PyCharm,因为它在运行程序和调试程序上功能强大,结合 Amazon Q CoderWhisperer Toolkit plugin,让写代码也可以很轻松。
首先从 JetBrains 网站上下载 PyCharm 的安装包,个人开发者可以使用 Community 版本,而 Professional 版本提供了更多高级功能,适合企业用户使用。Linux 上下载后解压,通过下面命令启动:
为了方便,也可以把 PyCharm 加入菜单中,在 Tools 里面选择 Create Desktop Entry 即可,最后效果如下图:
3.2 语言环境准备
因为现有 Amazon Managed Service for Apache Flink 最高支持 Flink 版本是 1.15,按照 jeremyber-aws 的说法,最好使用 Python 3.8。下面是简单的 过程,为了读者方便,将 github 上过程搬运过来,并按照 Linux 环境做了修改:
- 从官网(https://docs.anaconda.com/free/miniconda/miniconda-hashes)或者清华镜像源(https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda)下载 Miniconda3-py38_23.10.0-1-Linux-x86_64.sh
- 执行安装,需要接受 License terms,使用默认路径 /home/ec2-user/miniconda3
- 修改 .bashrc 文件中 PATH 环境变量,将 miniconda 加入,方便日后执行:
然后执行:
- 激活 conda
- 创建 pyflink 虚拟环境(名字可以自取)
- 安装 Java 11 环境
3.3 PyCharm 设置
首先在 PyCharm 选择 Get from VCS https://github.com/aws-samples/pyflink-getting-started.git,将代码克隆到本地。
在 File->Settings->Project->Python Interpreter 中,增加新的 Interpreter,选择 Conda 环境中在上文创建的虚拟 Python 环境:
另外,在 Settings->Plugins 里面,安装 AWS Toolkit – Amazon Q, CodeWhisperer, and more 插件:
3.4 Amazon CLI 设置
为了能访问 Amazon Kinesis 和 S3 等资源,需要在开发环境上配置相应的亚马逊云科技的 IAM AKSK。一开始,我们可以只给予 AmazonKinesisFullAccess 的权限,后续根据功能的需求再逐渐增加。安装 Amazon CLI 的步骤可以参考官方文档:https://docs.amazonaws.cn/en_us/cli/latest/userguide/getting-started-install.html。
然后使用 aws configure 来配置该用户的 AKSK,并作为 default 的 profile。如果想配置成其他 profile,后续在 PyCharm 中运行程序时需要额外设置。
4. 开始第一个程序
现在我们可以开始准备尝试第一个 Flink 程序了。pyflink-examples/GettingStarted 下面是一个以 Amazon Kinesis Stream 为数据源,然后再写入到另外一个 Amazon Kinesis Stream 的程序。
4.1 创建 Amazon Kinesis Stream
使用按需模式来创建两个 Kinesis Stream,分别为 input-stream 和 output-stream,其他配置都按照默认。
4.2 生成测试数据
现在作为数据源的 input-stream 还没有任何数据,我们通过 datagen/stock.py 代码来生成一些虚拟的股票价格来输入到 input-stream 中。笔者修改了该代码的两个细节,一是增加 sleep 时间,避免数据产生太多;二是修改了 PartitionKey 部分,避免所有数据都使用同一个 key,从而导致数据到写入到一个 Shard,在后续测试中会产生等待数据的问题。另外 region 也改为北京区域 cn-north-1:
4.3 准备相关 Java 包
Flink 访问 Kinesis Stream 需要相应的 jar 包,从 Maven 仓库中下载 flink-sql-connector-kinesis-1.15.2.jar,然后放入到 GettingStarted 的 lib 目录(需要创建)。
4.4 更新应用属性
修改 GettingStarted/application_properties.json 文件中 aws.region 配置,分别是第 14 和 22 行,改成“cn-north-1”。
4.5 修改运行配置
在 getting-started.py 文件上右键选择 Modify Run Configuration…,
在 Run 下面的 interpreter 中选择之前配置的 flink-env,在 Environment variables 中增加 IS_LOCAL=true,最后如果配置了额外的 Amazon CLI profile,要选择相应的。
4.6 运行程序
现在我们就可以开始运行程序了,在右上角的运行栏中选择 getting-started,点击右边绿色三角形按钮就可以开始运行程序了。
如果一切顺利,下方 Run 框里面没有错误信息:
4.7 查看日志和 Flink Dashboard
程序运行的日志位于 pyflink 目录中,比如:/home/ec2-user/miniconda3/envs/flink-env/lib/python3.8/site-packages/pyflink/log/flink-ec2-user-python-ip-172-31-xxx-xxx.cn-north-1.compute.internal.log。
打开该文件,查找 Web frontend,就可以找到如下消息,其中 http://localhost:35633 就是 Flink 的 Web UI。注意:这个端口号每次都会变化。
2024-03-01 14:40:31,354 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] – Web frontend listening at http://localhost:35633.
用浏览器打开该网址,就可以查看 Flink 当前运行的任务以及其相关信息:
4.8 验证结果
转到亚马逊云科技的控制台,在 Kinesis 的 Monitor 面板,可以看到 input-stream 的 GetRecords 监控数据表示有数据被读取:
在 output-stream 的 Incoming data 中,可以看到数据被写入:
4.9 更直观的结果
上面要查看 Kinesis output-stream 的数据,需要额外的方法去显示。这边把代码稍作修改,将输出改到 Console,就可以直接看到程序运行的结果了:
修改 getting-start.py 的第 161-164 行,将 create_sink_table 改成 create_print_table。这将修改 Flink 的 connector 为 print,从而将结果输出到 Console 上。
再次运行程序后,就可以看到数据输出了:
5. 部署上云
程序在本地开发调试完成后,就可以部署到云上了。
5.1 打包
代码需要打包成 zip 格式,再上传到 S3 上,才能部署到托管的 Flink 上。
然后将 GettingStarted.zip 上传到 S3 上。
5.2 创建 Flink 应用
在亚马逊云科技的控制台上,在 Managed Apache Flink 中创建 Flink Application,选择 Apache Flink 1.15 版本,Templates 可以选择 Development。
5.3 部署
在创建的 Flink 应用中,选择 Configure,按照如下来设置:
- 选择 zip 的 S3 路径。
- 在 Runtime properties 中设置如下:
Group ID | Key | Value |
kinesis.analytics.flink.run.options | jarfile | GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar |
kinesis.analytics.flink.run.options | python | GettingStarted/getting-started.py |
consumer.config.0 | input.stream.name | input-stream |
consumer.config.0 | aws.region | cn-north-1 |
consumer.config.0 | scan.stream.initpos | LATEST |
producer.config.0 | output.stream.name | output-stream |
producer.config.0 | aws.region | cn-north-1 |
producer.config.0 | shard.count | 1 |
部署完成后,就可以运行了。同样可以打开 Apache Flink Dashboard 查看运行状况,如果要查看日志,可以通过 CloudWatch Logs 来查看。
6. 结束语
本文演示了如何在本地(Linux 环境)搭建了 PyFlink 的开发环境,以便快速的进行开发和调试,并保证代码与 Amazon 托管的 Flink 兼容性,调试完后直接可以部署到云端。
接下来一篇将是从本地开发环境写入到 S3 的内容,敬请期待。
附录参考网址:
[1] https://github.com/aws-samples/pyflink-getting-started
[2] https://nightlies.apache.org/flink/flink-docs-release-1.15/