在 Amazon EMR 大数据计算引擎中,提供了 Apache Flink 执行框架,底层使用 YARN 作为资源管理。虽然,Amazon EMR 提供了 Zeppelin Notebook 可以让用户以 Notebook 的方式编写并调试 Flink 代码与SQL,但是对于可视化的任务提交、管理、监控、跟踪等功能,无法通过 Notebook 的方式完成。针对此痛点,在本方案中我们引入了开源的实时计算平台 Dinky,将 Flink 任务的开发、管理与 Amazon EMR Flink 完美结合,完善了基于 Amazon EMR 的数据分析平台的实时任务开发功能。
Dinky 介绍
整体架构
Dinky 是一个开箱即用的一站式实时计算平台,以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架,提供敏捷的 Flink SQL、Flink Jar 作业开发、部署及监控能力,助力实时计算高效应用,致力于流批一体和湖仓一体的建设与实践。
IDE 式开发
Dinky 提供一个轻量级的 IDE 式开发环境,提供一站式开发能力,从语句编写、调试、提交 到 监控、发布、丝滑流畅,解决 sql 作业文件多,管理困难, 编写困难等问题,还支持智能代码提示,Env 参数,全局变量等,让开发更简单,顺滑。支持 Flink Sql、Flink Jar、FlinkCDC 同步等功能。
细化运维管理
Dinky 无缝支持流批一体,Yarn,K8s,Standalone,任务提交管理全方位支持,运维中心对原有 Flink WebUI 进行增强,提供持久化监控,个性化告警规则配置,智能重启,停止与 savepoint 管理等功能。另外还提供了 rs 文件系统拓展、UDF 管理、元数据管理等功能。
下面我们介绍如何将 Dinky 与 Amzon EMR Flink 计算引擎进行集成。
安装配置
环境准备
- 首先,启动一个 EMR Flink 集群,通过 EMR 集群主节点/etc/flink/conf/flink-conf.yaml 配置文件查看 EMR Flink JRE 版本,比如:
## By default EMR sets Java 8 as default java runtime for flink
env.java.home: /usr/lib/jvm/java-1.8.0-amazon-corretto
containerized.master.env.JAVA_HOME: /usr/lib/jvm/java-1.8.0-amazon-corretto
containerized.taskmanager.env.JAVA_HOME: /usr/lib/jvm/java-1.8.0-amazon-corretto
- 安装并启动一个 MySQL 服务,或者使用一个外部的 MySQL 服务(例如 Amazon RDS MySQL 8.0)
- 启动一台 EC2 服务器作为 Dinky 服务器,这里我们使用了 m6i.xlarge 实例类型,100G EBS 硬盘,操作系统使用 Amazon Linux 2023。为了后续能与 Amazon EMR 正常进行通信,需保证此 EC2 实例的网络路由和安全组配置能与 Amazon EMR 主节点、核心节点能够互通
- 下载 Dinky 1.0.3 版本到 EC2 ,解压,本文假设 Dinky 安装目录在/opt/dinky/
cd /opt/
sudo wget https://github.com/DataLinkDC/dinky/releases/download/v1.0.3/dinky-release-1.17-1.0.3.tar.gz
sudo tar -xvf dinky-release-1.17-1.0.3.tar.gz
sudo mv dinky-release-1.17-1.0.3 dinky
- 安装 jdk,根据第一步结果,在 Dinky 服务器安装 jdk
sudo amazon-linux-extras enable corretto8
sudo yum install java-1.8.0-amazon-corretto
sudo yum install java-1.8.0-amazon-corretto-devel
数据库初始化
Dinky 采用 MySQL 作为后端的存储库,MySQL 支持 5.7+。这里假设你已经安装了 MySQL 。首先需要创建 Dinky 的后端数据库,这里以配置文件中提供的默认库进行创建。
在 Dinky 根目录 sql 文件夹下分别放置了 dinky-mysql.sql 、 upgrade/${version}_schema/mysql/ddl 和 dml。如果第一次部署,可以直接将 sql/dinky-mysql.sql 文件在 dinky 数据库下执行。(如果之前已经部署,那 upgrade 目录下存放了各版本的升级 sql ,根据版本号按需执行即可)
- 创建 dinky 用户和权限
#登录mysql
mysql -uroot -p
#创建数据库
mysql>
CREATE DATABASE dinky;
#创建用户并允许远程登录
mysql>
create user 'dinky'@'%' IDENTIFIED WITH mysql_native_password by 'dinky';
#授权
mysql>
grant ALL PRIVILEGES ON dinky.* to 'dinky'@'%';
mysql>
flush privileges;
- 导入初始化数据
#首先登录 mysql
mysql -h host -udinky -pdinky
mysql> use dinky;
mysql> source /opt/dinky/sql/dinky-mysql.sql
修改 Dinky 安装配置
- 修改 Dinky 配置文件,选择默认数据源为 mysql
cd /opt/dinky/config/
vim application.yml
# 修改 Dinky 所使用的数据库类型为 mysql
spring:
application:
name: Dinky
profiles:
# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2]
# If you use mysql database, please configure mysql database connection information in application-mysql.yml
# If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml
# If you use the h2 database, please configure the h2 database connection information in application-h2.yml,
# note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with caution
active: ${DB_ACTIVE:mysql} #[h2,mysql,pgsql]
- 修改 Dinky 的 mysql 的配置文件
vim application-mysql.yml
# 修改 Dinky 的 mysql 链接配置
spring:
datasource:
url: jdbc:mysql://${MYSQL_ADDR:127.0.0.1:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${MYSQL_USERNAME:dinky}
password: ${MYSQL_PASSWORD:dinky}
driver-class-name: com.mysql.cj.jdbc.Driver
- 复制 EMR 配置文件到 Dinky 服务器
sudo mkdir -p /etc/flink/conf
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/etc/alternatives/flink-conf/* /etc/flink/conf/
sudo mkdir -p /etc/hadoop/conf
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/etc/alternatives/hadoop-conf/* /etc/hadoop/conf/
准备和上传依赖文件
- 准备 mysql jdbc 驱动 jar 包
sudo wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
sudo mv mysql-connector-java-8.0.28.jar /opt/dinky/lib/
sudo chmod 777 /opt/dinky/lib/mysql-connector-java-8.0.28.jar
- Dinky 需要具备内置的 Flink 环境,将 EMR Flink 相关环境 jar 包复制到 Dinky 服务器
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/usr/lib/flink/lib/* /opt/dinky/extends/flink1.17/
sudo rm /opt/dinky/extends/flink1.17/flink-table-planner-loader-1.17.1-amzn-1.jar
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/usr/lib/flink/opt/flink-table-planner_2.12-1.17.1-amzn-1.jar /opt/dinky/extends/flink1.17/
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/usr/lib/hadoop/*.jar /opt/dinky/customJar/
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/usr/lib/hadoop/client/*.jar /opt/dinky/customJar/
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/usr/share/aws/emr/emrfs/lib/*.jar /opt/dinky/customJar/
sudo scp -r -i /home/ec2-user/key.pem hadoop@<EMR Master IP>:/usr/lib/flink/plugins/s3/*.jar /opt/dinky/customJar/
配置 EMR Flink 环境
Dinky 任务在提交到 EMR 后,需要在 HDFS 中寻找依赖 jar 包,所以需要将 dinky 的部分 jar 包上传到 EMR HDFS。
#以下命令在Dinky服务器执行
sudo scp -r -i /home/ec2-user/key.pem /opt/dinky/jar/dinky-app-1.17-1.0.3-jar-with-dependencies.jar hadoop@<EMR Master IP>:/home/hadoop/dinky/
sudo scp -r -i /home/ec2-user/key.pem /opt/dinky/lib/mysql-connector-java-8.0.28.jar hadoop@<EMR Master IP>:/home/hadoop/dinky/
#以下命令在EMR Master节点执行
hdfs dfs -mkdir /user/hadoop/dinky/
hdfs dfs -mkdir /user/hadoop/flink/
hdfs dfs -mkdir /user/hadoop/flink/lib/
hdfs dfs -put /home/hadoop/dinky/dinky-app-1.17-1.0.3-jar-with-dependencies.jar /user/hadoop/dinky/
hdfs dfs -put /home/hadoop/dinky/mysql-connector-java-8.0.28.jar /user/hadoop/flink/lib/
hdfs dfs -put /usr/lib/flink/lib/* /user/hadoop/flink/lib/
hdfs dfs -put /usr/lib/flink/plugins/s3/* /user/hadoop/flink/lib/
hdfs dfs -rm /user/hadoop/flink/lib/flink-table-planner-loader-1.17.1-amzn-1.jar
hdfs dfs -put /usr/lib/flink/opt/flink-table-planner_2.12-1.17.1-amzn-1.jar /user/hadoop/flink/lib/
启动 Dinky
# 启动
cd /opt/dinky/
sudo bash auto.sh start
# 查看启动日志
tail -fn1000 /opt/dinky/logs/dinky-start.log
# 停止
sudo bash auto.sh stop
使用默认用户名/密码:admin/dinky123!@# 浏览器登陆 ip:8888 即可进入 Dinky 界面
部署和提交 Flink 任务
任务提交模式
Dinky 支持多种任务提交模式,包括 LOCAL 本地调试模式、K8s 模式、Standalone 模式、YarnSession 模式、Application 模式等,当我们提交任务到 EMR Flink 时,我们采用 Application 模式提交。要使用 Application 模式提交任务,首先需要在 Dinky 注册中心配置 EMR Flink 集群。
在 Dinky 控制台点击注册中心,进入后点击左侧集群-集群配置,进入集群配置列表界面,然后点击新建,创建一个集群。
在新建集群界面,填入集群信息,任务类型选 Yarn Application,填写集群配置名称,Hadoop 配置文件路径,Flink Lib 路径,Flink Job 相关配置和 Jar 文件路径。
- 类型:选择 Yarn Application 模式
- 集群配置名称:集群配置名称, 用于区分不同集群配置
- Hadoop 配置文件路径:填写前面【复制 EMR 配置文件到 Dinky 服务器】步骤中的 Hadoop 配置文件的路径
- Flink Lib 路径:这里要注意,填写的是 HDFS 的路径;见步骤【配置 EMR Flink 环境】中创建的 Flink Lib 目录
- Flink 配置文件路径:填写前面【复制 EMR 配置文件到 Dinky 服务器】步骤中的 Flink 配置文件的路径
- Jar 文件路径:填写步骤【配置 EMR Flink 环境】中 dinky-app-1.17-1.0.3-jar-with-dependencies.jar 的 HDFS 路径
- 保存点路径和检查点路径:这里可以填写 S3 路径或 HDFS 路径,确保 EMR Master 节点有写入权限。注意如果填写 HDFS 路径时,由于 Dinky 任务以 root 用户提交,没有权限读取 HDFS,需要在 EMR Master 节点执行 hdfs dfs -chmod 777 /user/hadoop/flink,root 用户可以写入保存点和检查点
- 其他配置,根据自己的 Flink 任务需求配置
资源库配置
Dinky 在 v1.0.0 版本开始,提供了资源中心的功能,可以在 Dinky 中很方便地进行资源的管理,包括上传资源、删除资源(逻辑删除)、预览资源等。
需要注意的是,资源中心中的资源并不是指 Dinky 服务的资源,而是指 Dinky 服务所管理的资源,比如上传的文件,或者是 Git 项目的产物等,都可以在资源中心中进行管理。
在 Dinky v1.0.0 版本中,扩展了 rs 协议,可以通过 rs 协议访问资源中心中的资源,方便地进行资源的访问。
支持功能
- 支持托管 Git 构建任务的产物,包含 Jar Zip 等
- 支持上传资源
- 支持预览部分文件类型内容,如:配置文件、文本文件等
- 支持 rs 协议,不管原本的文件系统是什么,只要是由资源中心管理的,都可以通过 rs 协议访问
- 支持 Jar 任务提交时,通过 rs 协议访问资源中心中的 Jar 资源,并适用于 Flink 全模式任务的提交
请注意:
- 该功能依赖于资源配置,请确保已经配置了文件系统,配置详见文件系统配置,请在配置完成后再使用该功能,否则会出现异常;
- 虽然提供了删除功能,为了避免误操作,在 Dinky 中删除资源并不会删除文件系统中的文件,仅会删除 Dinky 的数据库中的记录,如需彻底删除,请自行手动删除文件系统中的文件;
- 如果上传同名文件,会覆盖原有文件,请谨慎操作;
- 资源中心默认有一层逻辑根节点 Root,该节点目录不可删除,所有上传文件/新建目录,都归于该节点下,如果使用 rs 协议访问,请注意路径,不要带上 Root 节点。
使用方式
eg:假设在配置中心 → Resource 配置 中使用 LOCAL 协议,并设置了上传根路径为 /tmp/dinky/resource,那么可以通过以下方式访问资源中心中的资源,在使用过程中可以直接忽视上传根路径的配置。
# 假设基于以上配置, 在资源中心 Root 根目录下上传了一个 app.jar 文件,那么可以通过以下方式访问
rs:/app.jar
# 假设基于以上配置, 在资源中心 Root 根目录 新建了一个目录 test,并在 test 目录下上传了一个 app.jar 文件,那么可以通过以下方式访问
rs:/test/app.jar
FlinkSql 任务提交
在数据开发中创建 FlinkSql 作业
编写 FlinkSql
DROP table if exists datagen;
CREATE TABLE datagen (
a INT,
b varchar
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'number-of-rows' = '50'
);
DROP table if exists print_table;
CREATE TABLE print_table (
a INT,
b varchar
) WITH (
'connector'='print'
);
insert into print_table select a,b from datagen ;
切换执行模式并运行 FlinkSql 作业
观察控制台输出
跳转运维中心查看任务运行状况
Flink Jar 任务提交
上传任务 jar 包
在提交 Flink Jar 之前,先将任务 jar 包上传到 Dinky 资源目录,比如:
在数据开发中创建 FlinkSql 作业
编写任务提交代码
EXECUTE JAR WITH (
'uri'='rs:/app/WordCount.jar',
'main-class'='org.apache.flink.streaming.examples.wordcount.WordCount',
'args'='--input s3://<your-bucket-name>/<your-folder-name>/flinktestin/input.txt --output s3://xxxx/aaabb/flinktestout/',
'parallelism'='1'
);
在过程中,如果参数填写了 S3 的地址(例如参数里有 S3 地址),可能会遇到这个报错:
Could not find a file system implementation for scheme 's3'.
The scheme is directly supported by Flink through the following plugin(s):
flink-s3-fs-hadoop, flink-s3-fs-presto.
解决办法是:
# 拷贝 flink-s3-fs-hadoop jar到dinky lib目录
cp /usr/lib/flink/plugins/s3/flink-s3-fs-hadoop-1.17.1-amzn-1.jar lib/
# 重启dinky服务
cd ~/dinky/
bash auto.sh restart
Flink CDC 整库同步
Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS(CREATE DATABASE AS)作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 MySQL 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等,详细参考 Dinky 官方文档。
下面是 MySQLCDC 整库到 MySQL 的 Demo。
前置准备
- 请确保已经在 Flink/lib 和 dinky/extends 目录下放置了 Jdbc 的 Flink connector jar。如果提交模式为 Application/Per-Job,请确保 Jdbc connector jar 已经放置在 HDFS 中(/user/hadoop/flink/lib/)。
- 请确保已经在 Flink/lib 和 dinky/extends 目录下放置了 MySQL CDC 的 Flink connector jar。如果提交模式为 Application/Per-Job,请确保 MySQL CDC connector jar 已经放置在 HDFS 中(/user/hadoop/flink/lib/)。
- 如在两方启动后才进行放置上述 jar 包,请重启 Flink 和 Dinky 服务,或者使用 Dinky 中提供的 ADD CUSTOMJAR 功能进行加载。
- 依赖 jar 包:
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-guava/32.1.3-jre-19.0/flink-shaded-guava-32.1.3-jre-19.0.jar
wget https://repo1.maven.org/maven2/com/zaxxer/HikariCP/5.1.0/HikariCP-5.1.0.jar
wget https://repo1.maven.org/maven2/com/alibaba/druid/1.2.21/druid-1.2.21.jar
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.17.0/jackson-datatype-jsr310-2.17.0.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.1/flink-sql-connector-mysql-cdc-3.0.1.jar
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.17/1.6.1/flink-doris-connector-1.17-1.6.1.jar
将上述下载的依赖包上传到资源中心目录中,比如 rs:/lib/,或者将上述包放到 Dinky 服务器 dinky/extends 目录和 HSDS 目录/user/hadoop/flink/lib/中(根据集群配置中设定的目录放置)。
创建作业
编写作业代码
add customjar 'rs:/lib/flink-shaded-guava-32.1.3-jre-19.0.jar';
add customjar 'rs:/lib/HikariCP-5.1.0.jar';
add customjar 'rs:/lib/druid-1.2.21.jar';
add customjar 'rs:/lib/jackson-datatype-jsr310-2.17.0.jar';
add customjar 'rs:/lib/flink-sql-connector-mysql-cdc-3.0.1.jar';
add customjar 'rs:/lib/flink-doris-connector-1.17-1.6.1.jar';
EXECUTE CDCSOURCE cdc_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'bigdata',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '#{tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)
注意事项
- 该示例为将 mysql 整库同步到另一个 mysql 数据库,写入 test 库,表名前缀 test_,表名全小写,开启自动建表。
- 该示例参数中的 #{tableName} 为占位符,实际执行时会替换为实际表名,如 ods_products、ods_orders 等。
- 该示例 sink 中的各个参数均可根据实际情况进行调整,请按照 mysql 连接器官方文档进行配置。并请遵守整库同步的规范。
跳转运维中心查看任务运行状况
总结
本文介绍了 Dinky 作为一个开源的实时计算平台,如何与 Amazon EMR Flink 集成部署,为数据分析平台提供易用且强大的实时在线任务处理能力。Dinky 凭借其先进的架构设计、完备的功能模块和便捷的开发运维体验,能够很好地弥补 Amazon EMR 在任务提交、管理、监控等方面的复杂度,使得构建实时数据分析平台的整个过程更加顺畅高效。
无论是传统的 Flink SQL/JAR 任务,还是新兴的 CDC 实时数据同步需求,Dinky 均提供了出色的支持,开发者可以在其中无缝切换,大幅降低开发运维的复杂度。同时,其资源库、自定义 jar 包等辅助功能,也进一步增强了平台的可扩展性和适用性。
总的来说,Dinky 与 EMR Flink 的完美融合为企业构建现代化的实时数据分析平台提供了一条高效可行的技术路径,是数据平台现代化进程中的重要一环。相信通过本文档的指导,读者一定能够顺利部署和使用这一方案,开启实时数据分析之旅。
参考资料
https://dinky.org.cn/docs/next/get_started/overview
https://github.com/DataLinkDC/dinky
基于开源工具构建 EMR 数据分析平台系列文章
基于开源工具构建 EMR 数据分析平台(一)方案总体介绍
基于开源工具构建 EMR 数据分析平台(二)使用 Dinky 进行 Flink 任务开发、管理(本文)
本篇作者