亚马逊AWS官方博客
基于亚马逊云科技托管 Flink 的开发系列 — 本地开发环境篇
1. 概述
随着 2023 年 8 月亚马逊云科技把原来 Amazon Kinesis Data Analytics 更名为 Amazon Managed Service for Apache Flink,这一改变凸显了 Flink 在流式数据处理方面的领导地位。Flink 拥有高效的流式数据处理引擎,在性能、事件时间处理、故障处理等技术上具有领先优势。Flink 已被众多知名企业应用在搜索推荐、实时分析、欺诈检测等关键场景中,具有良好的实践验证。
本系列将以一个从未接触 Flink 的用户实际出发,讲述从本地开发,部署,以及解决一些特定问题,希望能帮助到有兴趣开发 PyFlink(即 Flink-Python)的应用,并部署到 Amazon Managed Service for Apache Flink 的用户(本文以亚马逊云科技中国北京区域为例)。
2. Amazon Managed Service for Apache Flink 简介
Amazon Managed Service for Apache Flink 是一项托管的服务,用户无需管理基础设施。通过自动扩缩容,优化资源使用,节省成本。同时确保高可用,自动检测并重启失败的 Flink 组件。另外使用了官方的 Apache Flink 开源代码,可以实现无缝迁移。
Amazon Managed Service for Apache Flink 支持使用 Java,Scala 和 Python 语言编写的应用,考虑客户擅长 Python,本文将以 Python 为例。
![]() |
3. 本地开发环境设置
日常开发 Flink 程序时,可以在本地先调试完成后,再部署到托管的 Flink 上,这样既提高了开发效率,也可以节省开发成本。
本系列以亚马逊云科技官方案例 https://github.com/aws-samples/pyflink-getting-started 为基础,针对中国区(北京区域和宁夏区域)增加了特别配置来顺利执行写入 S3 的操作,以及如何制作 uber jar,连接 TLS 认证的 Apache Kafka(在后续文章中)。在实际验证过程中,笔者发现在 Windows 平台上部署时,在使用 UDF 情况下会出现 Failed to create stage bundle factory 的错误,研究许久后也无法解决,所以如果需要用到 UDF 的话,建议使用 Linux 或者 MacOS。下文将以 Amazon Linux 2 为例,使用 MacOS 的同学可以参考 github 上原文。该 Linux 基于 Amazon Linux 2 + Mate Deskstop,具体可以参考 https://docs.amazonaws.cn/en_us/AWSEC2/latest/UserGuide/amazon-linux-ami-mate.html。
3.1 开发工具安装
虽然众多工具都可以用来开发 PyFlink 的程序,但是笔者还是会推荐使用 PyCharm,因为它在运行程序和调试程序上功能强大,结合 Amazon Q CoderWhisperer Toolkit plugin,让写代码也可以很轻松。
首先从 JetBrains 网站上下载 PyCharm 的安装包,个人开发者可以使用 Community 版本,而 Professional 版本提供了更多高级功能,适合企业用户使用。Linux 上下载后解压,通过下面命令启动:
为了方便,也可以把 PyCharm 加入菜单中,在 Tools 里面选择 Create Desktop Entry 即可,最后效果如下图:
![]() |
3.2 语言环境准备
因为现有 Amazon Managed Service for Apache Flink 最高支持 Flink 版本是 1.15,按照 jeremyber-aws 的说法,最好使用 Python 3.8。下面是简单的 过程,为了读者方便,将 github 上过程搬运过来,并按照 Linux 环境做了修改:
- 从官网(https://docs.anaconda.com/free/miniconda/miniconda-hashes)或者清华镜像源(https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda)下载 Miniconda3-py38_23.10.0-1-Linux-x86_64.sh
- 执行安装,需要接受 License terms,使用默认路径 /home/ec2-user/miniconda3
- 修改 .bashrc 文件中 PATH 环境变量,将 miniconda 加入,方便日后执行:
然后执行:
- 激活 conda
- 创建 pyflink 虚拟环境(名字可以自取)
- 安装 Java 11 环境
3.3 PyCharm 设置
首先在 PyCharm 选择 Get from VCS https://github.com/aws-samples/pyflink-getting-started.git,将代码克隆到本地。
在 File->Settings->Project->Python Interpreter 中,增加新的 Interpreter,选择 Conda 环境中在上文创建的虚拟 Python 环境:
![]() |
另外,在 Settings->Plugins 里面,安装 AWS Toolkit – Amazon Q, CodeWhisperer, and more 插件:
![]() |
3.4 Amazon CLI 设置
为了能访问 Amazon Kinesis 和 S3 等资源,需要在开发环境上配置相应的亚马逊云科技的 IAM AKSK。一开始,我们可以只给予 AmazonKinesisFullAccess 的权限,后续根据功能的需求再逐渐增加。安装 Amazon CLI 的步骤可以参考官方文档:https://docs.amazonaws.cn/en_us/cli/latest/userguide/getting-started-install.html。
然后使用 aws configure 来配置该用户的 AKSK,并作为 default 的 profile。如果想配置成其他 profile,后续在 PyCharm 中运行程序时需要额外设置。
4. 开始第一个程序
现在我们可以开始准备尝试第一个 Flink 程序了。pyflink-examples/GettingStarted 下面是一个以 Amazon Kinesis Stream 为数据源,然后再写入到另外一个 Amazon Kinesis Stream 的程序。
4.1 创建 Amazon Kinesis Stream
使用按需模式来创建两个 Kinesis Stream,分别为 input-stream 和 output-stream,其他配置都按照默认。
![]() |
4.2 生成测试数据
现在作为数据源的 input-stream 还没有任何数据,我们通过 datagen/stock.py 代码来生成一些虚拟的股票价格来输入到 input-stream 中。笔者修改了该代码的两个细节,一是增加 sleep 时间,避免数据产生太多;二是修改了 PartitionKey 部分,避免所有数据都使用同一个 key,从而导致数据到写入到一个 Shard,在后续测试中会产生等待数据的问题。另外 region 也改为北京区域 cn-north-1:
4.3 准备相关 Java 包
Flink 访问 Kinesis Stream 需要相应的 jar 包,从 Maven 仓库中下载 flink-sql-connector-kinesis-1.15.2.jar,然后放入到 GettingStarted 的 lib 目录(需要创建)。
4.4 更新应用属性
修改 GettingStarted/application_properties.json 文件中 aws.region 配置,分别是第 14 和 22 行,改成“cn-north-1”。
4.5 修改运行配置
在 getting-started.py 文件上右键选择 Modify Run Configuration…,
![]() |
在 Run 下面的 interpreter 中选择之前配置的 flink-env,在 Environment variables 中增加 IS_LOCAL=true,最后如果配置了额外的 Amazon CLI profile,要选择相应的。
![]() |
4.6 运行程序
现在我们就可以开始运行程序了,在右上角的运行栏中选择 getting-started,点击右边绿色三角形按钮就可以开始运行程序了。
![]() |
如果一切顺利,下方 Run 框里面没有错误信息:
![]() |
4.7 查看日志和 Flink Dashboard
程序运行的日志位于 pyflink 目录中,比如:/home/ec2-user/miniconda3/envs/flink-env/lib/python3.8/site-packages/pyflink/log/flink-ec2-user-python-ip-172-31-xxx-xxx.cn-north-1.compute.internal.log。
打开该文件,查找 Web frontend,就可以找到如下消息,其中 http://localhost:35633 就是 Flink 的 Web UI。注意:这个端口号每次都会变化。
2024-03-01 14:40:31,354 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] – Web frontend listening at http://localhost:35633.
用浏览器打开该网址,就可以查看 Flink 当前运行的任务以及其相关信息:
![]() |
4.8 验证结果
转到亚马逊云科技的控制台,在 Kinesis 的 Monitor 面板,可以看到 input-stream 的 GetRecords 监控数据表示有数据被读取:
![]() |
在 output-stream 的 Incoming data 中,可以看到数据被写入:
![]() |
4.9 更直观的结果
上面要查看 Kinesis output-stream 的数据,需要额外的方法去显示。这边把代码稍作修改,将输出改到 Console,就可以直接看到程序运行的结果了:
修改 getting-start.py 的第 161-164 行,将 create_sink_table 改成 create_print_table。这将修改 Flink 的 connector 为 print,从而将结果输出到 Console 上。
再次运行程序后,就可以看到数据输出了:
![]() |
5. 部署上云
程序在本地开发调试完成后,就可以部署到云上了。
5.1 打包
代码需要打包成 zip 格式,再上传到 S3 上,才能部署到托管的 Flink 上。
![]() |
然后将 GettingStarted.zip 上传到 S3 上。
5.2 创建 Flink 应用
在亚马逊云科技的控制台上,在 Managed Apache Flink 中创建 Flink Application,选择 Apache Flink 1.15 版本,Templates 可以选择 Development。
5.3 部署
在创建的 Flink 应用中,选择 Configure,按照如下来设置:
- 选择 zip 的 S3 路径。
- 在 Runtime properties 中设置如下:
Group ID | Key | Value |
kinesis.analytics.flink.run.options | jarfile | GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar |
kinesis.analytics.flink.run.options | python | GettingStarted/getting-started.py |
consumer.config.0 | input.stream.name | input-stream |
consumer.config.0 | aws.region | cn-north-1 |
consumer.config.0 | scan.stream.initpos | LATEST |
producer.config.0 | output.stream.name | output-stream |
producer.config.0 | aws.region | cn-north-1 |
producer.config.0 | shard.count | 1 |
部署完成后,就可以运行了。同样可以打开 Apache Flink Dashboard 查看运行状况,如果要查看日志,可以通过 CloudWatch Logs 来查看。
6. 结束语
本文演示了如何在本地(Linux 环境)搭建了 PyFlink 的开发环境,以便快速的进行开发和调试,并保证代码与 Amazon 托管的 Flink 兼容性,调试完后直接可以部署到云端。
接下来一篇将是从本地开发环境写入到 S3 的内容,敬请期待。
附录参考网址:
[1] https://github.com/aws-samples/pyflink-getting-started
[2] https://nightlies.apache.org/flink/flink-docs-release-1.15/