亚马逊AWS官方博客

远程调度 EMR 集群的常见方式总结

1.简介

亚马逊云科技上的 EMR 大数据平台提供了计算存储分离的弹性架构(计算在虚拟机上,存储在s3上),基于此,常见的如流式处理,即时查询等作业可以单独运行在一个长期开启的EMR集群之中;而对于定时,跑批等业务可以运行在动态启停的EMR集群之中。对于后者来讲,由于其动态启停的特性,我们往往需要外置任务调度器于EMR集群之外。在上述的背景之下,我们会在接下来的文章之中对于当调度器外置之后,常见的远程调度EMR集群的方式,相应的优缺点,相关的例子等方面进行探讨。

2.常见调度方式总结

从大类上分,远程调度分为紧耦合方式和松耦合方式,其中紧耦合方式更适合调度长期运行的集群,而松耦合方式更加适用于动态启停的集群

  • 紧耦合方式: 适合长期运行的集群
    • Copy EMR Enviornment
      • 即Copy对应集群EMR的相关配置到调度器节点上
      • 优点
        • 提供和在EMR master节点上执行的同样体验
      • 缺点
        • 相对紧耦合
        • 需要用amazon linux2,否则很多依赖缺失需要大量的额外工作
        • 如果有动态启停EMR需求的话,其脚本会相对复杂,并且涉及到资源的迁移,即迁移相关的环境变量到对应的调度器环境中
  • 松耦合方式: 适合运行型模式更为灵活的集群
aws emr create-cluster --name "Add Spark Step Cluster" --release-label emr-5.30.0 --applications Name=Spark \ --ec2-attributes KeyName=myKey --instance-type m5.xlarge --instance-count 3 \ --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10] --use-default-roles --auto-terminate
        • 解耦调度器与EMR集群
        • 配合AWS原生调度器Step function十分易用
      • 缺点
        • 异步执行,所以需要对应的轮训机制确保执行成功
        • Step 需要顺序执行,不能多任务并行
    • SSH
      • 即在具体的job中ssh到EMR master节点执行对应的操作
      • 优点
        • 相对简单
        • 解耦调度器与EMR集群
      • 缺点
        • 需要执行的jar包需要提前扔到EMR master上,在实际环境中Azkaban往往处于内网的机器,所以传包需要先传到跳板机,再放到对应EMR机器,可以适当考虑使用S3FS简化流程

3.基于Azkaban进行远程调度的样例

在接下来的例子中,我们会分别展示基于Azkaban使用SSH,EMR API进行远程EMR集群的启动,任务运行,错误处理,集群关闭的完整流程。下图展示了整体的流程示意

3.1 SSH的方式

前提条件

  • 对应EMR的默认安全组放行调度器所在的安全组
  • 对应EMR_DefaultRole和EMR_EC2_DefaultRole存在
  • 确保在Azkaban机器上存在对应集群的私钥
  • 下载jq工具
  • Azkaban机器要附加足够的权限

具体配置

登录到Azkaban机器,建立如下文件目录

[ec2-user@ip-10-0-1-51 flow]$ ls -l
total 36
-rw-rw-r-- 1 ec2-user ec2-user   27 Jun 18 10:06 conditional_flow.project
-rw-rw-r-- 1 ec2-user ec2-user   42 May 18 07:48 create_cluster.job
-rw-rw-r-- 1 ec2-user ec2-user 1671 May 19 05:42 create_cluster.sh
-rw-rw-r-- 1 ec2-user ec2-user 267 Jun 18 03:21 delete_cluster.sh
-rw-rw-r-- 1 ec2-user ec2-user 304 Jun 20 10:03 error_handler.sh
-rw-rw-r-- 1 ec2-user ec2-user 543 Jun 18 10:01 function.flow
-rw-rw-r-- 1 ec2-user ec2-user 919 Jun 18 07:18 spark_task.sh

具体三个job的定义如下,整个flow的依赖定义在function.flow中

nodes:
- name: create_cluster
  type: command
  config:
    command: sh create_cluster.sh

- name: spark_job
  type: command
  dependsOn:
    - create_cluster
  config:
    command: sh spark_task.sh
  condition: all_success

- name: delete_cluster
  type: command
  dependsOn:
    - spark_job
  config:
    command: sh delete_cluster.sh
  condition: all_success

- name: error_handle
  type: command
  dependsOn:
    - spark_job
  config:
    command: sh error_handler.sh
  condition: one_failed

创建集群脚本: create_cluster.sh 下述参数部分需要按照自身情况进行定义

  • KeyName
  • SubnetIds
#!/bin/sh

## Create cluster with instance fleet
export clusterId=$(aws emr create-cluster \
--name emr-test \
--tags name=emr-test \
--applications Name=Hadoop Name=Hive Name=Spark Name=Livy Name=Hue Name=Hadoop \
--release-label emr-5.29.0 \
--service-role EMR_DefaultRole \
--ec2-attributes KeyName=zhy-key,InstanceProfile=EMR_EC2_DefaultRole,SubnetIds=['subnet-2cf25a45'] \
--instance-fleets \
InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{InstanceType=m4.large}'] \
InstanceFleetType=CORE,TargetSpotCapacity=4,TargetOnDemandCapacity=4,InstanceTypeConfigs=['{InstanceType=m4.large,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=m5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r4.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=120, TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
--region cn-northwest-1 | grep "ClusterId" | awk -F ':' '{print $2}' | tr -d "," | tr -d "\"" | tr -d " ")

## waiting for ready
aws emr wait cluster-running --cluster-id $clusterId --region cn-northwest-1

## get cluster dns Name 
export host_dns=$(aws emr describe-cluster --cluster-id $clusterId --region cn-northwest-1 | jq -r '.Cluster.MasterPublicDnsName')

## deliver dns name to next step 此处注意转义的格式
echo '{"dns": "'"$host_dns"'", "clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}

# in case failed
echo '{"dns": "'"$host_dns"'", "clusterId": "'"$clusterId"'"}' > /home/ec2-user/cluster_info.json

echo "cluster created!

执行任务脚本: spark_task.sh 下述参数部分需要按照自身情况进行定义

  • zhy-key.pem
[ec2-user@ip-10-0-1-51 flow]$ cat spark_task.sh
#!/bin/sh

## catch error
set -e

## get hostname from previous job
host_dns=$(grep 'dns' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')

## clear host
cat /dev/null > /home/ec2-user/.ssh/known_hosts

## get the fingerprint first
ssh-keyscan -H $host_dns >> /home/ec2-user/.ssh/known_hosts

## execute remotely
ssh -i /home/ec2-user/zhy-key.pem hadoop@$host_dns << remotessh

echo "-------------------------------------------------------------------执行程序spark job-------------------------------------------------------------------"
spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi --executor-cores 1 --num-executors 2 --executor-memory 1024M /usr/lib/spark/examples/jars/spark-examples.jar
exit
remotessh
## Deliver clusterId to job of delete_cluster 
echo '{"clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}

删除集群: delete_cluster.sh

#!/bin/sh
## get clusterId
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')
## shutdown cluster
aws emr terminate-clusters --cluster-ids $clusterId
## Wait unitl termination finished
aws emr wait cluster-terminated --cluster-id $clusterId

错误处理:

#!/bin/sh
text_file=/home/ec2-user/cluster_info.json
## get clusterId
clusterId=$(cat $text_file | jq -r .clusterId)

echo $clusterId
## shutdown cluster
aws emr terminate-clusters --cluster-ids $clusterId
## Wait unitl termination finished
aws emr wait cluster-terminated --cluster-id $clusterId

执行示意图

执行任务之前,需要配置如下的执行规则,否则错误处理逻辑不会触发

当前流程执行图

历史执行记录查看

3.2 EmrStep API 模式

前提

  • 对应EMR的默认安全组放行调度器所在的安全组
  • 对应EMR_DefaultRole和EMR_EC2_DefaultRole存在
  • Azkaban机器要有足够的EMR权限

具体步骤

此种方式与SSH方式上基本的逻辑相同,主要区别为具体EMR 任务执行的逻辑不同

[ec2-user@ip-10-0-1-51 flow]$ ls
create_cluster.job create_cluster.sh delete_cluster.job delete_cluster.sh spark_task.job spark_task.sh
[ec2-user@ip-10-0-1-51 flow]$ cat create_cluster.job
type=command
command=sh create_cluster.sh
[ec2-user@ip-10-0-1-51 flow]$ cat spark_task.job
type=command
command=sh spark_task.sh
dependencies=create_cluster
[ec2-user@ip-10-0-1-51 flow]$ cat delete_cluster.job
type=command
command=sh delete_cluster.sh
dependencies=spark_task

创建集群脚本: create_cluster.sh 下述部分一定要按照自身情况进行定义

  • KeyName
  • SubnetIds
#!/bin/sh

## Create cluster with instance fleet
export clusterId=$(aws emr create-cluster \
--name emr-test \
--tags name=emr-test \
--applications Name=Hadoop Name=Hive Name=Spark Name=Livy Name=Hue Name=Hadoop \
--release-label emr-5.29.0 \
--service-role EMR_DefaultRole \
--ec2-attributes KeyName=zhy-key,InstanceProfile=EMR_EC2_DefaultRole,SubnetIds=['subnet-2cf25a45'] \
--instance-fleets \
InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{InstanceType=m4.large}'] \
InstanceFleetType=CORE,TargetSpotCapacity=4,TargetOnDemandCapacity=4,InstanceTypeConfigs=['{InstanceType=m4.large,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=m5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r5.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}','{InstanceType=r4.xlarge,BidPriceAsPercentageOfOnDemandPrice=100,WeightedCapacity=4}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=120, TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
--region cn-northwest-1 | grep "ClusterId" | awk -F ':' '{print $2}' | tr -d "," | tr -d "\"" | tr -d " ")

## waiting for ready
aws emr wait cluster-running --cluster-id $clusterId --region cn-northwest-1

## get cluster dns Name 
export host_dns=$(aws emr describe-cluster --cluster-id $clusterId --region cn-northwest-1 | jq -r '.Cluster.MasterPublicDnsName')

## deliver dns name to next step 此处注意转义的格式
echo '{"dns": "'"$host_dns"'", "clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}

echo "cluster created!

执行任务脚本: spark_task.sh

[ec2-user@ip-10-0-1-51 flow]$ cat spark_task.sh
#!/bin/sh

## get clusterId from previous job
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')

## add a step to emr
stepId=$(aws emr add-steps --cluster-id $clusterId --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10] | jq -r .StepIds | jq '.[0]' | tr -d "\"" | tr -d " ")

## polling emr status unitl step finished 
aws emr wait step-complete --cluster-id $clusterId --step-id $stepId

## Deliver clusterId to job of delete_cluster 
echo '{"clusterId": "'"$clusterId"'"}' >> ${JOB_OUTPUT_PROP_FILE}

删除集群逻辑:delete_cluster.sh

#!/bin/sh

## get clusterId
clusterId=$(grep 'clusterId' ${JOB_PROP_FILE} | awk -F '=' '{print $2}')

## shutdown cluster
aws emr terminate-clusters --cluster-ids $clusterId

## Wait unitl termination finished
aws emr wait cluster-terminated --cluster-id $clusterId

4.0 其他注意事项

  • 在调试Azkaban job的过程中, type: command, command: sh create_cluster.sh 类型的任务只要在shell环境中执行成功就算成功,部分情况下捕捉不到应用失败导致的错误,可以用如下的方式进行解决
    • 本机执行,以及远程ssh的大部分错误可以通过脚本中添加set -e进行捕捉,捕捉后的错误处理流程可以依据自身情况进行改动
    • 集群动态启停,对于出现错误的任务无法查日志,那么可以把spark任务的提交方式改为client,通过ssh隧道写回,那么在azkaban处也可以看到详细的log了
  • 注意设置重试,emr同样的任务,同样的配置可能由于各种情况偶然性失败
  • Azkaban在调度的过程中,参数只能在相邻的job且是job文件中才能传递,相隔的,脚本中均无法传递参数
    • 相邻job对应的脚本中通过写默认${JOB_OUTPUT_PROP_FILE},读${JOB_PROP_FILE}进行传递
    • 非相邻job自定义临时文件进行存储
  • 另外关于Apache livy 的调用方式可以参考如下博客:https://aws.amazon.com/cn/blogs/china/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy/

本篇作者

尹振宇

亚马逊云科技解决方案架构师,负责基于亚马逊云科技云平台的解决方案咨询和设计,尤其在无服务器领域和微服务领域有着丰富的实践经验。