亚马逊AWS官方博客

基于亚马逊云科技托管 Flink 的开发系列 — 本地开发环境篇

1. 概述

随着 2023 年 8 月亚马逊云科技把原来 Amazon Kinesis Data Analytics 更名为 Amazon Managed Service for Apache Flink,这一改变凸显了 Flink 在流式数据处理方面的领导地位。Flink 拥有高效的流式数据处理引擎,在性能、事件时间处理、故障处理等技术上具有领先优势。Flink 已被众多知名企业应用在搜索推荐、实时分析、欺诈检测等关键场景中,具有良好的实践验证。

本系列将以一个从未接触 Flink 的用户实际出发,讲述从本地开发,部署,以及解决一些特定问题,希望能帮助到有兴趣开发 PyFlink(即 Flink-Python)的应用,并部署到 Amazon Managed Service for Apache Flink 的用户(本文以亚马逊云科技中国北京区域为例)。

2. Amazon Managed Service for Apache Flink 简介

Amazon Managed Service for Apache Flink 是一项托管的服务,用户无需管理基础设施。通过自动扩缩容,优化资源使用,节省成本。同时确保高可用,自动检测并重启失败的 Flink 组件。另外使用了官方的 Apache Flink 开源代码,可以实现无缝迁移。

Amazon Managed Service for Apache Flink 支持使用 Java,Scala 和 Python 语言编写的应用,考虑客户擅长 Python,本文将以 Python 为例。

3. 本地开发环境设置

日常开发 Flink 程序时,可以在本地先调试完成后,再部署到托管的 Flink 上,这样既提高了开发效率,也可以节省开发成本。

本系列以亚马逊云科技官方案例 https://github.com/aws-samples/pyflink-getting-started 为基础,针对中国区(北京区域和宁夏区域)增加了特别配置来顺利执行写入 S3 的操作,以及如何制作 uber jar,连接 TLS 认证的 Apache Kafka(在后续文章中)。在实际验证过程中,笔者发现在 Windows 平台上部署时,在使用 UDF 情况下会出现 Failed to create stage bundle factory 的错误,研究许久后也无法解决,所以如果需要用到 UDF 的话,建议使用 Linux 或者 MacOS。下文将以 Amazon Linux 2 为例,使用 MacOS 的同学可以参考 github 上原文。该 Linux 基于 Amazon Linux 2 + Mate Deskstop,具体可以参考 https://docs.amazonaws.cn/en_us/AWSEC2/latest/UserGuide/amazon-linux-ami-mate.html

3.1 开发工具安装

虽然众多工具都可以用来开发 PyFlink 的程序,但是笔者还是会推荐使用 PyCharm,因为它在运行程序和调试程序上功能强大,结合 Amazon Q CoderWhisperer Toolkit plugin,让写代码也可以很轻松。

首先从 JetBrains 网站上下载 PyCharm 的安装包,个人开发者可以使用 Community 版本,而 Professional 版本提供了更多高级功能,适合企业用户使用。Linux 上下载后解压,通过下面命令启动:

sh bin/pycharm.sh

为了方便,也可以把 PyCharm 加入菜单中,在 Tools 里面选择 Create Desktop Entry 即可,最后效果如下图:

3.2 语言环境准备

因为现有 Amazon Managed Service for Apache Flink 最高支持 Flink 版本是 1.15,按照 jeremyber-aws 的说法,最好使用 Python 3.8。下面是简单的 过程,为了读者方便,将 github 上过程搬运过来,并按照 Linux 环境做了修改:

  1. 从官网(https://docs.anaconda.com/free/miniconda/miniconda-hashes)或者清华镜像源(https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda)下载 Miniconda3-py38_23.10.0-1-Linux-x86_64.sh
  2. 执行安装,需要接受 License terms,使用默认路径 /home/ec2-user/miniconda3
    bash Miniconda3-py38_23.10.0-1-Linux-x86_64.sh
  3. 修改 .bashrc 文件中 PATH 环境变量,将 miniconda 加入,方便日后执行:
    export PATH=~/miniconda3/bin:$PATH

    然后执行:

    source ~/.bashrc
  4. 激活 conda
    conda init
  5. 创建 pyflink 虚拟环境(名字可以自取)
    conda create -n flink-env pip python=3.8
    conda activate flink-env
    pip install apache-flink==1.15.2
    
  6. 安装 Java 11 环境
    sudo yum install java-11-amazon-corretto.x86_64
    $java -version
    openjdk version "11.0.16" 2022-07-19 LTS
    OpenJDK Runtime Environment Corretto-11.0.16.8.1 (build 11.0.16+8-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.16.8.1 (build 11.0.16+8-LTS, mixed mode)
    

3.3 PyCharm 设置

首先在 PyCharm 选择 Get from VCS https://github.com/aws-samples/pyflink-getting-started.git,将代码克隆到本地。

在 File->Settings->Project->Python Interpreter 中,增加新的 Interpreter,选择 Conda 环境中在上文创建的虚拟 Python 环境:

另外,在 Settings->Plugins 里面,安装 AWS Toolkit – Amazon Q, CodeWhisperer, and more 插件:

3.4 Amazon CLI 设置

为了能访问 Amazon Kinesis 和 S3 等资源,需要在开发环境上配置相应的亚马逊云科技的 IAM AKSK。一开始,我们可以只给予 AmazonKinesisFullAccess 的权限,后续根据功能的需求再逐渐增加。安装 Amazon CLI 的步骤可以参考官方文档:https://docs.amazonaws.cn/en_us/cli/latest/userguide/getting-started-install.html

然后使用 aws configure 来配置该用户的 AKSK,并作为 default 的 profile。如果想配置成其他 profile,后续在 PyCharm 中运行程序时需要额外设置。

4. 开始第一个程序

现在我们可以开始准备尝试第一个 Flink 程序了。pyflink-examples/GettingStarted 下面是一个以 Amazon Kinesis Stream 为数据源,然后再写入到另外一个 Amazon Kinesis Stream 的程序。

4.1 创建 Amazon Kinesis Stream

使用按需模式来创建两个 Kinesis Stream,分别为 input-stream 和 output-stream,其他配置都按照默认。

4.2 生成测试数据

现在作为数据源的 input-stream 还没有任何数据,我们通过 datagen/stock.py 代码来生成一些虚拟的股票价格来输入到 input-stream 中。笔者修改了该代码的两个细节,一是增加 sleep 时间,避免数据产生太多;二是修改了 PartitionKey 部分,避免所有数据都使用同一个 key,从而导致数据到写入到一个 Shard,在后续测试中会产生等待数据的问题。另外 region 也改为北京区域 cn-north-1:

import datetime
import json
import random
import boto3
import time

STREAM_NAME = "input-stream"
STREAM_REGION = "cn-north-1"


def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=data['event_time'][-2:])
        time.sleep(1)

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name=STREAM_REGION))

4.3 准备相关 Java 包

Flink 访问 Kinesis Stream 需要相应的 jar 包,从 Maven 仓库中下载 flink-sql-connector-kinesis-1.15.2.jar,然后放入到 GettingStarted 的 lib 目录(需要创建)。

4.4 更新应用属性

修改 GettingStarted/application_properties.json 文件中 aws.region 配置,分别是第 14 和 22 行,改成“cn-north-1”。

4.5 修改运行配置

在 getting-started.py 文件上右键选择 Modify Run Configuration…,

在 Run 下面的 interpreter 中选择之前配置的 flink-env,在 Environment variables 中增加 IS_LOCAL=true,最后如果配置了额外的 Amazon CLI profile,要选择相应的。

4.6 运行程序

现在我们就可以开始运行程序了,在右上角的运行栏中选择 getting-started,点击右边绿色三角形按钮就可以开始运行程序了。

如果一切顺利,下方 Run 框里面没有错误信息:

4.7 查看日志和 Flink Dashboard

程序运行的日志位于 pyflink 目录中,比如:/home/ec2-user/miniconda3/envs/flink-env/lib/python3.8/site-packages/pyflink/log/flink-ec2-user-python-ip-172-31-xxx-xxx.cn-north-1.compute.internal.log。

打开该文件,查找 Web frontend,就可以找到如下消息,其中 http://localhost:35633 就是 Flink 的 Web UI。注意:这个端口号每次都会变化。

2024-03-01 14:40:31,354 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] – Web frontend listening at http://localhost:35633.

用浏览器打开该网址,就可以查看 Flink 当前运行的任务以及其相关信息:

4.8 验证结果

转到亚马逊云科技的控制台,在 Kinesis 的 Monitor 面板,可以看到 input-stream 的 GetRecords 监控数据表示有数据被读取:

在 output-stream 的 Incoming data 中,可以看到数据被写入:

4.9 更直观的结果

上面要查看 Kinesis output-stream 的数据,需要额外的方法去显示。这边把代码稍作修改,将输出改到 Console,就可以直接看到程序运行的结果了:

修改 getting-start.py 的第 161-164 行,将 create_sink_table 改成 create_print_table。这将修改 Flink 的 connector 为 print,从而将结果输出到 Console 上。

# 3. Creates a sink table writing to a Kinesis Data Stream
#table_env.execute_sql(
#    create_sink_table(output_table_name, output_stream, output_region, stream_initpos)
#)

table_env.execute_sql(
    create_print_table(output_table_name, output_stream, output_region, stream_initpos)
)

再次运行程序后,就可以看到数据输出了:

5. 部署上云

程序在本地开发调试完成后,就可以部署到云上了。

5.1 打包

代码需要打包成 zip 格式,再上传到 S3 上,才能部署到托管的 Flink 上。

$cd pyflink-examples
$zip -r GettingStarted.zip pyflink-examples/GettingStarted

然后将 GettingStarted.zip 上传到 S3 上。

5.2 创建 Flink 应用

在亚马逊云科技的控制台上,在 Managed Apache Flink 中创建 Flink Application,选择 Apache Flink 1.15 版本,Templates 可以选择 Development。

5.3 部署

在创建的 Flink 应用中,选择 Configure,按照如下来设置:

  1. 选择 zip 的 S3 路径。
  2. 在 Runtime properties 中设置如下:
Group ID Key Value
kinesis.analytics.flink.run.options jarfile GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar
kinesis.analytics.flink.run.options python GettingStarted/getting-started.py
consumer.config.0 input.stream.name input-stream
consumer.config.0 aws.region cn-north-1
consumer.config.0 scan.stream.initpos LATEST
producer.config.0 output.stream.name output-stream
producer.config.0 aws.region cn-north-1
producer.config.0 shard.count 1

部署完成后,就可以运行了。同样可以打开 Apache Flink Dashboard 查看运行状况,如果要查看日志,可以通过 CloudWatch Logs 来查看。

6. 结束语

本文演示了如何在本地(Linux 环境)搭建了 PyFlink 的开发环境,以便快速的进行开发和调试,并保证代码与 Amazon 托管的 Flink 兼容性,调试完后直接可以部署到云端。

接下来一篇将是从本地开发环境写入到 S3 的内容,敬请期待。

附录参考网址:

[1] https://github.com/aws-samples/pyflink-getting-started

[2] https://nightlies.apache.org/flink/flink-docs-release-1.15/

本篇作者

周平

西云数据高级技术客户经理,致力于大数据技术的研究和落地,为亚马逊云科技中国客户提供企业级架构和技术支持。