1.简介
亚马逊云科技上的 EMR 大数据平台提供了计算存储分离的弹性架构(计算在虚拟机上,存储在s3上),基于此,常见的如流式处理,即时查询等作业可以单独运行在一个长期开启的EMR集群之中;而对于定时,跑批等业务可以运行在动态启停的EMR集群之中。对于后者来讲,由于其动态启停的特性,我们往往需要外置任务调度器于EMR集群之外。在上述的背景之下,我们会在接下来的文章之中对于当调度器外置之后,常见的远程调度EMR集群的方式,相应的优缺点,相关的例子等方面进行探讨。
2.常见调度方式总结
从大类上分,远程调度分为紧耦合方式和松耦合方式,其中紧耦合方式更适合调度长期运行的集群,而松耦合方式更加适用于动态启停的集群
- 紧耦合方式: 适合长期运行的集群
- Copy EMR Enviornment
- 即Copy对应集群EMR的相关配置到调度器节点上
- 优点
- 缺点
- 相对紧耦合
- 需要用amazon linux2,否则很多依赖缺失需要大量的额外工作
- 如果有动态启停EMR需求的话,其脚本会相对复杂,并且涉及到资源的迁移,即迁移相关的环境变量到对应的调度器环境中
- 松耦合方式: 适合运行型模式更为灵活的集群
- Apache Livy
- 通过Livy部署在EMR之上,提供http restful的接口
- 优点
- 缺点
- EmrSteps API
- 通过AWS 原生Step API进行调用
- 优点
- 简单易用,对于单步执行的任务,可以一条命令完成创建,执行到关闭的全过程
aws emr create-cluster --name "Add Spark Step Cluster" --release-label emr-5.30.0 --applications Name=Spark \ --ec2-attributes KeyName=myKey --instance-type m5.xlarge --instance-count 3 \ --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10] --use-default-roles --auto-terminate
-
-
-
- 解耦调度器与EMR集群
- 配合AWS原生调度器Step function十分易用
- 缺点
- 异步执行,所以需要对应的轮训机制确保执行成功
- Step 需要顺序执行,不能多任务并行
-
- SSH
- 即在具体的job中ssh到EMR master节点执行对应的操作
- 优点
- 缺点
- 需要执行的jar包需要提前扔到EMR master上,在实际环境中Azkaban往往处于内网的机器,所以传包需要先传到跳板机,再放到对应EMR机器,可以适当考虑使用S3FS简化流程
3.基于Azkaban进行远程调度的样例
在接下来的例子中,我们会分别展示基于Azkaban使用SSH,EMR API进行远程EMR集群的启动,任务运行,错误处理,集群关闭的完整流程。下图展示了整体的流程示意
3.1 SSH的方式
前提条件
- 对应EMR的默认安全组放行调度器所在的安全组
- 对应EMR_DefaultRole和EMR_EC2_DefaultRole存在
- 确保在Azkaban机器上存在对应集群的私钥
- 下载jq工具
- Azkaban机器要附加足够的权限
具体配置
登录到Azkaban机器,建立如下文件目录
[ec2-user@ip-10-0-1-51 flow]$ ls -l
total 36
-rw-rw-r-- 1 ec2-user ec2-user 27 Jun 18 10:06 conditional_flow.project
-rw-rw-r-- 1 ec2-user ec2-user 42 May 18 07:48 create_cluster.job
-rw-rw-r-- 1 ec2-user ec2-user 1671 May 19 05:42 create_cluster.sh
-rw-rw-r-- 1 ec2-user ec2-user 267 Jun 18 03:21 delete_cluster.sh
-rw-rw-r-- 1 ec2-user ec2-user 304 Jun 20 10:03 error_handler.sh
-rw-rw-r-- 1 ec2-user ec2-user 543 Jun 18 10:01 function.flow
-rw-rw-r-- 1 ec2-user ec2-user 919 Jun 18 07:18 spark_task.sh
具体三个job的定义如下,整个flow的依赖定义在function.flow中
nodes:
- name: create_cluster
type: command
config:
command: sh create_cluster.sh
- name: spark_job
type: command
dependsOn:
- create_cluster
config:
command: sh spark_task.sh
condition: all_success
- name: delete_cluster
type: command
dependsOn:
- spark_job
config:
command: sh delete_cluster.sh
condition: all_success
- name: error_handle
type: command
dependsOn:
- spark_job
config:
command: sh error_handler.sh
condition: one_failed
创建集群脚本: create_cluster.sh 下述参数部分需要按照自身情况进行定义
#!/bin/sh
## Create cluster with instance fleet
export clusterId=$(aws emr create-cluster \
--name emr-test \
--tags name=emr-test \
--applications Name=Hadoop Name=Hive Name=Spark Name=Livy Name=Hue Name=Hadoop \
--release-label emr-5.29.0 \
--service-role EMR_DefaultRole \
--ec2-attributes KeyName=zhy-key,InstanceProfile=EMR_EC2_DefaultRole,SubnetIds=['subnet-2cf25a45'] \
--instance-fleets \
InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{InstanceType=m4.large}'] \
InstanceFleetType=CORE,TargetSpotCapacity=4,TargetOnDemandCapacity=4,InstanceTypeConfigs=['{InstanceType=m4.large,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=m5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r4.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=120, TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
--region cn-northwest-1 | grep "ClusterId" | awk -F ':' '{print $2}' | tr -d "," | tr -d "\"" | tr -d " ")
## waiting for ready
aws emr wait cluster-running --cluster-id $clusterId --region cn-northwest-1
## get cluster dns Name
export host_dns=$(aws emr describe-cluster --cluster-id $clusterId --region cn-northwest-1 | jq -r '.Cluster.MasterPublicDnsName')
## deliver dns name to next step 此处注意转义的格式
echo '{"dns": "'"$host_dns"'", "clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}
# in case failed
echo '{"dns": "'"$host_dns"'", "clusterId": "'"$clusterId"'"}' > /home/ec2-user/cluster_info.json
echo "cluster created!
执行任务脚本: spark_task.sh 下述参数部分需要按照自身情况进行定义
[ec2-user@ip-10-0-1-51 flow]$ cat spark_task.sh
#!/bin/sh
## catch error
set -e
## get hostname from previous job
host_dns=$(grep 'dns' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')
## clear host
cat /dev/null > /home/ec2-user/.ssh/known_hosts
## get the fingerprint first
ssh-keyscan -H $host_dns >> /home/ec2-user/.ssh/known_hosts
## execute remotely
ssh -i /home/ec2-user/zhy-key.pem hadoop@$host_dns << remotessh
echo "-------------------------------------------------------------------执行程序spark job-------------------------------------------------------------------"
spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi --executor-cores 1 --num-executors 2 --executor-memory 1024M /usr/lib/spark/examples/jars/spark-examples.jar
exit
remotessh
## Deliver clusterId to job of delete_cluster
echo '{"clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}
删除集群: delete_cluster.sh
#!/bin/sh
## get clusterId
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')
## shutdown cluster
aws emr terminate-clusters --cluster-ids $clusterId
## Wait unitl termination finished
aws emr wait cluster-terminated --cluster-id $clusterId
错误处理:
#!/bin/sh
text_file=/home/ec2-user/cluster_info.json
## get clusterId
clusterId=$(cat $text_file | jq -r .clusterId)
echo $clusterId
## shutdown cluster
aws emr terminate-clusters --cluster-ids $clusterId
## Wait unitl termination finished
aws emr wait cluster-terminated --cluster-id $clusterId
执行示意图
执行任务之前,需要配置如下的执行规则,否则错误处理逻辑不会触发
当前流程执行图
历史执行记录查看
3.2 EmrStep API 模式
前提
- 对应EMR的默认安全组放行调度器所在的安全组
- 对应EMR_DefaultRole和EMR_EC2_DefaultRole存在
- Azkaban机器要有足够的EMR权限
具体步骤
此种方式与SSH方式上基本的逻辑相同,主要区别为具体EMR 任务执行的逻辑不同
[ec2-user@ip-10-0-1-51 flow]$ ls
create_cluster.job create_cluster.sh delete_cluster.job delete_cluster.sh spark_task.job spark_task.sh
[ec2-user@ip-10-0-1-51 flow]$ cat create_cluster.job
type=command
command=sh create_cluster.sh
[ec2-user@ip-10-0-1-51 flow]$ cat spark_task.job
type=command
command=sh spark_task.sh
dependencies=create_cluster
[ec2-user@ip-10-0-1-51 flow]$ cat delete_cluster.job
type=command
command=sh delete_cluster.sh
dependencies=spark_task
创建集群脚本: create_cluster.sh 下述部分一定要按照自身情况进行定义
#!/bin/sh
## Create cluster with instance fleet
export clusterId=$(aws emr create-cluster \
--name emr-test \
--tags name=emr-test \
--applications Name=Hadoop Name=Hive Name=Spark Name=Livy Name=Hue Name=Hadoop \
--release-label emr-5.29.0 \
--service-role EMR_DefaultRole \
--ec2-attributes KeyName=zhy-key,InstanceProfile=EMR_EC2_DefaultRole,SubnetIds=['subnet-2cf25a45'] \
--instance-fleets \
InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{InstanceType=m4.large}'] \
InstanceFleetType=CORE,TargetSpotCapacity=4,TargetOnDemandCapacity=4,InstanceTypeConfigs=['{InstanceType=m4.large,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=m5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r4.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=120, TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
--region cn-northwest-1 | grep "ClusterId" | awk -F ':' '{print $2}' | tr -d "," | tr -d "\"" | tr -d " ")
## waiting for ready
aws emr wait cluster-running --cluster-id $clusterId --region cn-northwest-1
## get cluster dns Name
export host_dns=$(aws emr describe-cluster --cluster-id $clusterId --region cn-northwest-1 | jq -r '.Cluster.MasterPublicDnsName')
## deliver dns name to next step 此处注意转义的格式
echo '{"dns": "'"$host_dns"'", "clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}
echo "cluster created!
执行任务脚本: spark_task.sh
[ec2-user@ip-10-0-1-51 flow]$ cat spark_task.sh
#!/bin/sh
## get clusterId from previous job
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')
## add a step to emr
stepId=$(aws emr add-steps --cluster-id $clusterId --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10] | jq -r .StepIds | jq '.[0]' | tr -d "\"" | tr -d " ")
## polling emr status unitl step finished
aws emr wait step-complete --cluster-id $clusterId --step-id $stepId
## Deliver clusterId to job of delete_cluster
echo '{"clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}
删除集群逻辑:delete_cluster.sh
#!/bin/sh
## get clusterId
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')
## shutdown cluster
aws emr terminate-clusters --cluster-ids $clusterId
## Wait unitl termination finished
aws emr wait cluster-terminated --cluster-id $clusterId
4.0 其他注意事项
本篇作者