亚马逊AWS官方博客
远程调度 EMR 集群的常见方式总结
1.简介
亚马逊云科技上的 EMR 大数据平台提供了计算存储分离的弹性架构(计算在虚拟机上,存储在s3上),基于此,常见的如流式处理,即时查询等作业可以单独运行在一个长期开启的EMR集群之中;而对于定时,跑批等业务可以运行在动态启停的EMR集群之中。对于后者来讲,由于其动态启停的特性,我们往往需要外置任务调度器于EMR集群之外。在上述的背景之下,我们会在接下来的文章之中对于当调度器外置之后,常见的远程调度EMR集群的方式,相应的优缺点,相关的例子等方面进行探讨。
2.常见调度方式总结
从大类上分,远程调度分为紧耦合方式和松耦合方式,其中紧耦合方式更适合调度长期运行的集群,而松耦合方式更加适用于动态启停的集群
- 紧耦合方式: 适合长期运行的集群
- Copy EMR Enviornment
- 即Copy对应集群EMR的相关配置到调度器节点上
- 优点
- 提供和在EMR master节点上执行的同样体验
- 缺点
- 相对紧耦合
- 需要用amazon linux2,否则很多依赖缺失需要大量的额外工作
- 如果有动态启停EMR需求的话,其脚本会相对复杂,并且涉及到资源的迁移,即迁移相关的环境变量到对应的调度器环境中
- Copy EMR Enviornment
- 松耦合方式: 适合运行型模式更为灵活的集群
- Apache Livy
- 通过Livy部署在EMR之上,提供http restful的接口
- 优点
- 简单易用
- 解耦调度器与EMR集群
- 缺点
- 需要修改任务提交的方式
- 更新相对较慢,从笔者开始接触起,其版本始终保持在0.7的非正式大版本上
- 样例参考:https://aws.amazon.com/cn/blogs/china/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy/
- EmrSteps API
- 通过AWS 原生Step API进行调用
- 优点
- 简单易用,对于单步执行的任务,可以一条命令完成创建,执行到关闭的全过程
- Apache Livy
-
-
-
- 解耦调度器与EMR集群
- 配合AWS原生调度器Step function十分易用
- 缺点
- 异步执行,所以需要对应的轮训机制确保执行成功
- Step 需要顺序执行,不能多任务并行
-
-
-
- SSH
- 即在具体的job中ssh到EMR master节点执行对应的操作
- 优点
- 相对简单
- 解耦调度器与EMR集群
- 缺点
- 需要执行的jar包需要提前扔到EMR master上,在实际环境中Azkaban往往处于内网的机器,所以传包需要先传到跳板机,再放到对应EMR机器,可以适当考虑使用S3FS简化流程
- SSH
3.基于Azkaban进行远程调度的样例
在接下来的例子中,我们会分别展示基于Azkaban使用SSH,EMR API进行远程EMR集群的启动,任务运行,错误处理,集群关闭的完整流程。下图展示了整体的流程示意
3.1 SSH的方式
前提条件
- 对应EMR的默认安全组放行调度器所在的安全组
- 对应EMR_DefaultRole和EMR_EC2_DefaultRole存在
- 确保在Azkaban机器上存在对应集群的私钥
- 下载jq工具
- Azkaban机器要附加足够的权限
具体配置
登录到Azkaban机器,建立如下文件目录
具体三个job的定义如下,整个flow的依赖定义在function.flow中
创建集群脚本: create_cluster.sh 下述参数部分需要按照自身情况进行定义
- KeyName
- SubnetIds
执行任务脚本: spark_task.sh 下述参数部分需要按照自身情况进行定义
- zhy-key.pem
删除集群: delete_cluster.sh
错误处理:
执行示意图
执行任务之前,需要配置如下的执行规则,否则错误处理逻辑不会触发
当前流程执行图历史执行记录查看
3.2 EmrStep API 模式
前提
- 对应EMR的默认安全组放行调度器所在的安全组
- 对应EMR_DefaultRole和EMR_EC2_DefaultRole存在
- Azkaban机器要有足够的EMR权限
具体步骤
此种方式与SSH方式上基本的逻辑相同,主要区别为具体EMR 任务执行的逻辑不同
创建集群脚本: create_cluster.sh 下述部分一定要按照自身情况进行定义
- KeyName
- SubnetIds
执行任务脚本: spark_task.sh
删除集群逻辑:delete_cluster.sh
4.0 其他注意事项
- 在调试Azkaban job的过程中, type: command, command: sh create_cluster.sh 类型的任务只要在shell环境中执行成功就算成功,部分情况下捕捉不到应用失败导致的错误,可以用如下的方式进行解决
- 本机执行,以及远程ssh的大部分错误可以通过脚本中添加set -e进行捕捉,捕捉后的错误处理流程可以依据自身情况进行改动
- 集群动态启停,对于出现错误的任务无法查日志,那么可以把spark任务的提交方式改为client,通过ssh隧道写回,那么在azkaban处也可以看到详细的log了
- 注意设置重试,emr同样的任务,同样的配置可能由于各种情况偶然性失败
- Azkaban在调度的过程中,参数只能在相邻的job且是job文件中才能传递,相隔的,脚本中均无法传递参数
- 相邻job对应的脚本中通过写默认${JOB_OUTPUT_PROP_FILE},读${JOB_PROP_FILE}进行传递
- 非相邻job自定义临时文件进行存储
- 另外关于Apache livy 的调用方式可以参考如下博客:https://aws.amazon.com/cn/blogs/china/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy/