亚马逊AWS官方博客

借助EventBridge及Lambda 实现EMR 自动调整集群大小

随着客户的业务发展,终端用户的数据量以及大数据分析的需求也随之增加。此时,大数据分析的成本也随之上升。AWS 提供多种工具协助客户做成本优化,其中使用EMR on EC2 Spot Instances是常用且有效的方式,节省可高达90% 。客户在高峰期大量使用Spot时可能会出现申请失败的情况,EMR 服务在无法满足申请Spot需求时,会在60分钟内重试,如60分钟后仍无法成功,则会放弃重试,后期即便有充足的Spot也不会重新发起申请。因此对于此情况下需要节省成本依旧使用Spot实例的客户,只能通过多次手动调整集群大小来获取足够的Spot实例运行EMR任务,大大增加维护成本。

以下方案是基于客户有大量每日需要运行4-8小时的临时集群且集群名称保持一致,Spot使用量大因此频繁性产生申请失败,通过EventBridge及Lambda实现EMR自动调整集群大小。

服务介绍

Amazon EventBridge

Amazon EventBridge 是一种无服务器事件总线,可使用从您的应用程序、集成式软件即服务 (SaaS) 应用程序和 AWS 服务生成的事件,更轻松地大规模构建事件驱动型应用程序。EventBridge 提供从事件源到目标对象的实时数据流。借助EventBridge,可实现AWS Services之间基于规则的实时驱动,同时也支持定时任务式的交互驱动。

AWS Lambda

AWS Lambda 是一项高可用的ServerLess计算服务,可使用户无需预配置或管理服务器即可运行代码。用户可以运行Lambda以响应事件,在使用时只需负责自己写的代码(支持如Node.js、Python、Java等7种编程语言),通过代码来实现业务逻辑。基于Lambda 用户可以实现在云上轻松实现服务间的自动化调用,提升云上服务效率。

Amazon DynamoDB

Amazon DynamoDB 是一种完全托管式、无服务器的 NoSQL 键值数据库,旨在运行任何规模的高性能应用程序。用户可以利用DynamoDB 作为轻量化的键值数据库,用于存放自动化方案中需要引用的参数。同时,DynamoDB与AWS 云上服务有多个灵活接口及SDK,易于服务间交互。

方案架构

基于客户的需求,此方案架构可以在新建EMR集群时触发EventBridge规则,从而触发Lambda记录EMR集群的id, 名称, 创建时间于DynamoDB。同时,通过EventBridge Schedule 定期任务的功能,可触发Lambda 检查集群是否当前运行Spot 数量小于目标值,如是,则帮助自动执行调整集群大小动作。

通过此方案,客户可以做到无人工干预地调整集群Spot数量大小到目标值,可有效地降低成本及提升运维效率。

方案配置

创建Dynamodb

  1. 创建emr-newcluster表

  1. 创建emr-resize-cluster表

更新需要进行resize的cluster名称:

创建Lambda

  1. 创建emr_dw_new_cluster_event函数
def lambda_handler(event, context):
    #从event中读取新建EMR集群的cluster id,名称及创建时间
    cid=event['detail']['clusterId']
    name=event['detail']['name']    
    timestamp=event['time']
    #将对应信息写入DynamoDB
    dynamodb.put_item(TableName='emr-newcluster',    
    Item={'clusterid':{'S':cid},'clustername':{'S':name},'timestamp':{'S':timestamp}})

注:对应IAM Role需要有dynamodb table 'emr-newcluster' 的putitem权限。

  1. 创建emr-dw-resize函数
import json
import boto3

emr = boto3.client('emr')
dynamodb = boto3.client('dynamodb')


def resizeinstancefleet(resizecid):
    response = emr.list_instance_fleets(
        ClusterId=resizecid);
        
    fleets = {}
    fleets["InstanceFleets"]={};
    #对比运行中的Spot以及目标Spot数量,如果小于目标值,则执行自动resize
    for f in response["InstanceFleets"]:
        if f["InstanceFleetType"] == "TASK":
            fleetid=f["Id"];
            targetspot=f["TargetSpotCapacity"];
            provisionedspot=f["ProvisionedSpotCapacity"];
            targetondemand=f["TargetOnDemandCapacity"];
            if provisionedspot < targetspot:
                response2 = emr.modify_instance_fleet(
                    ClusterId=resizecid,
                    InstanceFleet={
                        'InstanceFleetId': fleetid,
                        'TargetOnDemandCapacity': targetondemand,
                        'TargetSpotCapacity': targetspot
                    }
                );

def lambda_handler(event, context):
    
    response=dynamodb.scan(TableName='emr-resize-cluster')
    
    cn=[]
    i=0
    
    while(i < response['Count']):
        cn.append(response['Items'][i]['clustername']['S'])
        i=i+1
        
    for cname in cn:
        response1=dynamodb.scan(TableName='emr-newcluster',FilterExpression="clustername= :n",ExpressionAttributeValues={":n":{"S":cname}})
        
        cid=[]
        j=0
        
        while(j < response1['Count']):
            cid.append(response1['Items'][j]['clusterid']['S'])
            j=j+1
            
        for c in cid:
            response2=emr.describe_cluster(ClusterId=c)
            state=response2['Cluster']['Status']['State']
            if((state=='WAITING')):
                resizeinstancefleet(c)

注:对应IAM Role需要有以下权限:
dynamodb table 'emr-newcluster' 和'emr-resize-cluster'的scan权限。
emr listinstancefleets,describe_cluster和modifyinstancefleet权限。

创建EventBridge

  1. 配置emr_dw_new_cluster_event Rule

a. Event pattern

{
  "source": ["aws.emr"],
  "detail-type": ["EMR Cluster State Change"],
  "detail": {
    "state": ["STARTING"]
  }
}

b. Targets

  1. 配置emr_dw_schedule Rule

a. Event schedule

b. Targets

方案测试

测试说明

由于测试环境中难以模拟spot申请失败,因此通过固定值的方式来模拟resize:

if provisionedspot <= targetspot:
                response2 = emr.modify_instance_fleet(
                    ClusterId=resizecid,
                    InstanceFleet={
                        'InstanceFleetId': fleetid,
                        'TargetOnDemandCapacity': targetondemand,
                        'TargetSpotCapacity': 20
                    }
                );

新建cluster name为emr-eventbase-resize集群:

aws emr create-cluster --applications Name=Hadoop Name=Hive Name=Pig Name=Hue Name=Sqoop Name=Presto Name=Tez Name=Oozie Name=Spark Name=Ganglia --ebs-root-volume-size 10 --ec2-attributes '{"KeyName":"EmrKeyPair","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-091aff2145a19c21a","EmrManagedSlaveSecurityGroup":"sg-052e14156908b936a","EmrManagedMasterSecurityGroup":"sg-06de70e887339b988"}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.35.0 --log-uri 's3n://aws-logs-602709463925-us-west-2/elasticmapreduce/' --name 'emr-eventbase-resize' --configurations '[{"Classification":"hive-site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --instance-fleets '[{"InstanceFleetType":"TASK","TargetOnDemandCapacity":0,"TargetSpotCapacity":4,"LaunchSpecifications":{"SpotSpecification":{"TimeoutDurationMinutes":60,"TimeoutAction":"TERMINATE_CLUSTER"}},"InstanceTypeConfigs":[{"WeightedCapacity":4,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"BidPriceAsPercentageOfOnDemandPrice":100,"InstanceType":"m5.xlarge"}],"Name":"Task - 3"},{"InstanceFleetType":"MASTER","TargetOnDemandCapacity":1,"TargetSpotCapacity":0,"LaunchSpecifications":{},"InstanceTypeConfigs":[{"WeightedCapacity":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"BidPriceAsPercentageOfOnDemandPrice":100,"InstanceType":"m5.xlarge"}],"Name":"Master - 1"},{"InstanceFleetType":"CORE","TargetOnDemandCapacity":0,"TargetSpotCapacity":4,"LaunchSpecifications":{"SpotSpecification":{"TimeoutDurationMinutes":60,"TimeoutAction":"TERMINATE_CLUSTER"}},"InstanceTypeConfigs":[{"WeightedCapacity":4,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"BidPriceAsPercentageOfOnDemandPrice":100,"InstanceType":"m5.xlarge"}],"Name":"Core - 2"}]' --region us-west-2

查看Dynamodb emr-newcluster表,有对应的item

Lambda可看到对应的Invocation

Cluster进入waiting状态时,TASK Node已Resize为20 Spot units:

对于生产中Spot申请失败后的Resize,可在生产中进行验证。

总结

简而言之,AWS 平台上服务多样且灵活。用户可以基于生产需求,通过无服务方式来实现灵活的配置以及自动化流程。AWS Lambda可以通过定制化的方式来满足不同业务场景的需求,也可以和其他相关服务集成进一步提升运维自动化的能力。基于此方案,客户可以做到无人工干预地调整集群Spot数量大小到目标值,可有效地降低成本及提升运维效率。

本篇作者

梁绮莹

亚马逊云科技解决方案架构师,专注于数字原生企业的云架构设计和咨询,负责支持全球头部电商公司云项目。在云网络、应用交付、应用层安全、CDN、容器及微服务等领域有丰富的实战经验。

刘亚彬

AWS 解决方案架构师。负责基于 AWS 的云计算方案架构的咨询和设计,同时致力于 AWS 云服务在国内的应用和推广。在加入 AWS 前,拥有超过15年项目实施经验,曾就职于Citrix,主要服务于国内外金融类客户。