亚马逊AWS官方博客

Amazon EMR 集群的成本优化实践

一、背景介绍

在大数据处理领域,AWS EMR(Elastic MapReduce)作为一个完全托管的大数据平台,为企业提供了强大的数据处理能力。然而,在实际应用中,很多企业面临着资源使用效率与成本优化的挑战。特别是对于那些仅在特定时间段(如每天的凌晨时分)运行批处理任务的场景,维持一个 24 小时运行的 EMR 集群往往会造成大量资源浪费。

为了优化成本,许多企业选择采用定时创建和销毁 EMR 集群的策略。这种方案虽然能够显著降低运营成本,但同时也带来了新的技术挑战。最突出的问题是,每次创建新集群时,集群节点都会被分配新的 IP 地址(包括公网 IP 和私有 IP),这导致了两个主要困扰:首先,外部系统难以可靠地与 EMR 集群建立连接,因为连接端点始终在变化;其次,依赖固定 IP 地址的自动化脚本和调度系统需要不断更新配置,增加了维护的复杂度。

尽管部分场景下,客户可以使用 Amazon EMR Serverless 来替换 Amazon EMR,支撑任务集中时间段运行的场景。但如果客户有集成第三方开源软件的诉求时,使用 Amazon EMR 更为灵活便利。并且,Amazon EMR Serverless 是异步的方式提交任务,面对离线任务的任务调度(DAG)时,支持难免受限。

此外,虽然 AWS 提供了 EMR Steps 功能来支持瞬态集群的任务执行,但这种方式在处理复杂的工作流程时显得力不从心,特别是在需要处理复杂的任务调度(DAG)时,无法满足企业的实际需求。因此,我们需要一个更完善的解决方案,既能实现成本优化,又能保持 IP 地址的稳定性,从而确保任务调度和系统集成的可靠性。

二、解决方案介绍

本文将详细介绍如何通过弹性网卡(Elastic Network Interface)技术结合自动化脚本,来实现 EMR 集群的定时启停,同时保持 IP 地址的固定性,为企业提供一个既经济又可靠的大数据处理解决方案。

如上图所示,本解决方案主要有三个核心部分:

  • 创建集群:基于集群的模板,使用 AWS CLI 自动化的创建集群。
  • 附加网卡:事先创建好一个弹性网卡,每次集群启动后,就进行附加操作。以保留公有 IP 、私有 IP 以及安全组不变。
  • 关闭集群:基于 AWS CLI 自动关闭集群。

三、解决方案部署

准备集群模板

要创建一个复杂配置的集群,很难直接基于 AWS CLI 的命令文档,人工写出命令及参数配置,既不方便也容易出错。本方案中,将使用 Amazon EMR Console页面的“Clone In AWS CLI”的功能,生成一个集群模板。

首先,需要按照正常流程,使用 Console 页面创建一个 Amazon EMR 集群,并按实际需要在 Console 页面上进行各类配置,包括但不限于集群本身的参数配置(如 VPC 网络配置,集群软件参数配置等),以及通过引导脚本给集群预装一些集成软件(比如 Kyuubi)。下图是一个在 Console 上创建集群的示例。

创建集群的详细配置,这里就不展开了。下图是一个创建好的集群详情页面,我们可以通过右上角的“Clone In AWS CLI”按钮,获取集群创建的克隆模板,本质就是一个基于集群可视化配置,生成的一个 AWS CLI 命令。

拷贝上图中的 CLI 命令,记录下来,后续步骤会使用。

准备弹性网卡

为了解决每次创建集群,主节点 IP 都会变化的问题,我们需要提前创建一个弹性网卡,待集群创建好,附加到集群主节点上。考虑到本方案对应的场景中,集群每天重新创建,通常只保留几个小时时间,主节点出现问题的概念相对较小,所以一般创建的 Amazon EMR 集群只设置了 单主节点(对于长时间运行的集群或者期望高可用的场景,仍然建议开启集群高可用的特性,创建 3 个主节点)。因此,只需要事前准备一个弹性网卡即可。

进入 EC2 页面,点击“网络接口”菜单,创建一个网络接口。在创建页面中的子网、安全组设置,请和上面创建 Amazon EMR 步骤中的主节点子网与安全组保持一致。

创建完成后,如下图所示,记录网络接口的 ID,后续步骤会用到。

流程自动化

在完成了前面两个步骤的准备后,我们就需要将整个流程,使用调度引擎,进行自动化实现。若实际场景中没有使用专门的调度引擎,也可以考虑使用 Amazon EventBridge 。接下来,我们以 DolphinScheduler 这款开源调度引擎为例,来做演示。

首先,在 DolphinScheduler 中创建一个顶层流程,如下图所示。

  • 在“StartEMR”节点,基于前面步骤中的 CLI 命令模板,创建 Amazon EMR 集群,并附加前面准备的网络接口。
  • “workflow”节点,本身是一个子流程节点,可以关联真实的工作流程,并在其中定义复杂的 DAG 。
  • “StopEMR” 节点,就是在 DAG 中的内容执行完成后,终止 Amazon EMR 集群。

当然,也可以把“StartEMR”和“StopEMR” 节点,各自单独作为定时调度的工作流程,比如每天凌晨 1:00 执行“StartEMR”,凌晨 6:00 执行“StopEMR”。具体看客户的实际需求而定。

“StartEMR”节点的脚本如下,使用前面生成的 CLI 命令模板,替换下面$()中的内容,使用前面创建网络接口的 ID,替换下面的 eni_id:

CURRENT_DATE=$(date "+%Y-%m-%d %H:%M")
is_skip_existing_cluster=$1
# 使用 AWS CLI 命令检查当前活跃的 EMR 集群状态
existing_cluster=$(aws emr list-clusters --active --query 'Clusters[0].Id' --output text)
# 如果已存在 EMR 集群,直接返回
if [ -n "$existing_cluster" ] && [ "$existing_cluster" != "None" ] && [ $is_skip_existing_cluster == 0 ]; then
echo "An existing EMR cluster already exists"
exit 0
fi
#使用前面生成的 CLI命令模板,替换下面$()中的内容
result=$(aws emr create-cluster \
 --name "create-table-${CURRENT_DATE}" \
 --log-uri "s3://aws-logs-XXXXX-ap-southeast-1/elasticmapreduce" \
 --release-label "emr-6.10.1" \
 --service-role "arn:aws:iam:: XXXX:role/EMR_DefaultRole" \
 --managed-scaling-policy '{"ComputeLimits":{"UnitType":"Instances","MinimumCapacityUnits":2,"MaximumCapacityUnits":18,"MaximumOnDemandCapacityUnits":2,"MaximumCoreCapacityUnits":2}}' \
 --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","EmrManagedMasterSecurityGroup":"sg-04af1c7e454045f08","EmrManagedSlaveSecurityGroup":"sg-0921f099344f277df","KeyName":"emr","AdditionalMasterSecurityGroups":[],"AdditionalSlaveSecurityGroups":[],"SubnetId":"subnet-0f3fdffdea097d90f"}' \
 --applications Name=Hadoop Name=Hive Name=JupyterEnterpriseGateway Name=JupyterHub Name=Livy Name=Presto Name=Spark Name=Tez Name=ZooKeeper \
 --configurations '[{"Classification":"hive-site","Properties":{"hive.server2.idle.operation.timeout":"86400s","hive.server2.idle.session.timeout":"86400s","hive.server2.thrift.worker.keepalive.time":"120s","hive.blobstore.use.output-committer":"true","hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}},{"Classification":"tez-site","Properties":{"tez.session.am.dag.submit.timeout.secs":"600"}},{"Classification":"hive-env","Properties":{},"Configurations":[{"Classification":"export","Properties":{"HIVE_SERVER2_HEAPSIZE":"8192","HIVE_METASTORE_HEAPSIZE":"2048","HADOOP_HEAPSIZE":"2048"}}]},{"Classification":"hdfs-site","Properties":{"dfs.replication":"2","file.replication":"2"}},{"Classification":"spark-defaults","Properties":{"spark.history.fs.cleaner.enabled":"true","spark.history.fs.cleaner.interval":"10m","spark.history.fs.cleaner.maxAge":"1h"}},{"Classification":"spark-hive-site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]' \
 --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","Name":"Master - 1","InstanceType":"m7g.4xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp3","Iops":3000,"SizeInGB":256,"Throughput":125},"VolumesPerInstance":1}],"EbsOptimized":true}},{"InstanceCount":2,"InstanceGroupType":"CORE","Name":"Core - 2","InstanceType":"r7g.2xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp3","Iops":3000,"SizeInGB":256,"Throughput":125},"VolumesPerInstance":1}],"EbsOptimized":true}},{"InstanceCount":1,"InstanceGroupType":"TASK","Name":"任务节点 - 1","InstanceType":"r7g.2xlarge","BidPrice":"OnDemandPrice","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp3","Iops":3000,"SizeInGB":256,"Throughput":125},"VolumesPerInstance":1}],"EbsOptimized":true}},{"InstanceCount":1,"InstanceGroupType":"TASK","Name":"任务节点 - 2","InstanceType":"r6g.2xlarge","BidPrice":"OnDemandPrice","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp3","Iops":3000,"SizeInGB":256,"Throughput":125},"VolumesPerInstance":1}],"EbsOptimized":true}}]' \
 --bootstrap-actions '[{"Args":[],"Name":"initSparkThriftServer","Path":"s3://Bucket/init/initSparkThriftServer.sh"}]' \
 --scale-down-behavior "TERMINATE_AT_TASK_COMPLETION" \
 --ebs-root-volume-size "100" \
 --auto-termination-policy '{"IdleTimeout":7200}' \
 --os-release-label "2.0.20231206.0" \
 --region "ap-southeast-1"
)
echo "create cluster result ${result}"
cluster_id=$(echo "$result" | jq -r '.ClusterId')
echo "cluster id: ${cluster_id}"
#集群创建需要 6-10 分钟
sleep 600
# 获取主节点 EC2 ID
master_ins_id=$(aws emr list-instances --cluster-id "${cluster_id}" --instance-group-types MASTER --query 'Instances[0].Ec2InstanceId' | sed 's/"//g')
echo "master instance id: ${master_ins_id}"
#将固定的 ENI 绑定到主节点 EC2,需要将eni_id替换为前面创建网络接口的 ID
eni_id='eni-059a904bc47187fff'
attach_id=$(aws ec2 describe-network-interfaces --network-interface-ids "${eni_id}" --query 'NetworkInterfaces[0].Attachment.AttachmentId' --output text)
#如果已经存在网卡绑定,先接触
if [[ "$attach_id" == eni* ]]; then
    echo "there is an exsiting ENI attachment :${attach_id},need to detach..."
    aws ec2 detach-network-interface --attachment-id "$attach_id"
    sleep 5
fi
echo "attaching ENI the master node..."
aws ec2 attach-network-interface --network-interface-id "${eni_id}" --instance-id "${master_ins_id}" --device-index 2

“StopEMR” 节点的脚本如下,具体逻辑可以根据实际情况进行调整,比如不终止所有集群,只终止某个命名前缀的集群:
# 使用AWS CLI命令检查当前活跃的EMR集群状态
existing_clusters=$(aws emr list-clusters --active --query 'Clusters[].Id' --output text)
# 终止所有活跃的EMR集群
for cluster_id in $existing_clusters; do
echo "terminate emr ${cluster_id} "
aws emr terminate-clusters --cluster-ids $cluster_id
done
PowerShell

附加网卡:使用 Elastic Network Interface 技术,为新创建的集群附加弹性网络接口,确保 IP 地址的固定性,解决了集群重启后 IP 变化导致的连接问题。

执行任务 DAG:这一步骤由 DolphinScheduler 和 Apache Airflow 两个调度系统协同完成。它们负责编排和执行复杂的数据处理工作流,确保各个任务按照预定义的依赖关系有序执行。

关闭集群:当所有任务执行完成后,自动关闭 EMR 集群,避免资源浪费。

四、总结&综述

本文详细介绍了一个针对 AWS EMR 集群定时启停并保持 IP 地址固定的解决方案。该方案巧妙地结合了弹性网卡(ENI)技术和自动化脚本,有效解决了企业在使用 EMR 时面临的两大挑战:资源使用效率与 IP 地址变化问题。

通过实施这个解决方案,可以实现以下优势:首先,通过定时启停集群,显著降低了运营成本,避免了资源的闲置浪费;其次,借助弹性网卡技术,确保了集群主节点 IP 地址的稳定性,使得外部系统能够可靠地与 EMR 集群建立连接,同时也简化了自动化脚本和调度系统的维护工作。

整个方案不仅操作简单直观,而且具有很强的实用性和可扩展性。对于那些需要在特定时间段运行批处理任务的企业来说,这是一个既经济又可靠的解决方案,能够有效平衡成本控制和系统稳定性的需求。

本篇作者

张盼富

亚马逊云科技解决方案架构师,从业十三年,先后经过历云计算、供应链金融、电商等多个行业,担任过高级开发、架构师、产品经理、开发总监等多种角色,有丰富的大数据应用与数据治理经验。加入亚马逊云科技后,致力于通过大数据+AI 技术,帮助企业加速数字化转型。

谭欣

亚马逊云科技解决方案架构师,负责帮助客户设计和优化符合自身业务场景的云架构,并提供技术支持。在直播音视频架构设计方面有着丰富的实战经验。