亚马逊AWS官方博客

EMR Serverless——通过独立部署 Spark History Server 实现对 Spark Logs 统一管理和使用

前言

Amazon EMR 作为云上托管的 Apache Hadoop 大数据处理平台,从发布以来就受到客户的极大认可。越来越多的客户将其原有的 Hadoop 迁移到了 Amazon EMR,以享受其带来的便捷性、扩展性以及成本效益等。而在 2022 年底,AWS 又推出了 EMR Serverless 版本,进一步简化了使用,提升了可靠性,并且在很多实际场景下能获得更优的成本效益。

EMR Serverless 目前提供了 Spark 和 Hive 两种运行环境。在使用 Spark 进行大数据处理的过程中,我们可以借助 Spark UI 查询以下信息,这是快速进行 debug 和 troubleshooting 的必要手段:

  • 每个 Spark 阶段的事件时间作业的有向无环图(DAG)
  • SparkSQL 查询的物理和逻辑计划每个作业的基础 Spark 环境变量

虽然 EMR Serverless 对于执行的每⼀个 Spark Job 均提供了 Spark History Server,但是在某些特定的场景下,使用体验上还是会稍有不便:

  • 企业有大量的数据科学家/数据分析师,他们在使用 CLI/SDK 或者第三方调度平台提交 Job 后,如果要查看 Spark History,仍然需要通过 AWS Console 进行登录,这就要求每⼀个用户都有对应的 IAM User,并进行用户和权限管理。
  • 查看 Spark Job History 需要单独请求来拉起服务(10s-20s 左右加载完成),服务访问使⽤ URI 有效期为 1 小时,超过 1 小时需要重新申请。
  • 每⼀个 Spark Job History 都是启动的⼀个单独的服务,如果需要批量查询多个 Job 信息,需要在多个服务 URI 上进行切换。

此篇文章将介绍独立部署 Spark History Server 的方式,同时企业还可以基于自身实际需求,将 Spark History Server 管理和运营按组织/业务/项目/阶段/Application 进行划分,实现对 Spark Log 的统一管理和查询。

方案说明

架构图

方案概述

1、使用 EMR Serverless 提交 Spark Job,并指定日志存放 S3 位置。

2、编写 Lambda 函数,该函数实现 S3 内 log 文件的复制。

3、配置 EventBridge 事件通知,当 EMR Serverless Job 状态改变(Success/Failed)时,调用 Lambda。

4、使用 EC2 以 Docker 方式部署 Spark History Server,logDirectory 指向 log 转存位置。

5、访问 Spark History Server URI,对执行 job log 进行查询分析。

6、基于实际需求按组织/业务/项目/阶段/Application 进行划分,logs 转存到多个不同的路径,启动多个 Spark History Server 进行管理和使用;选择在一台或多台 EC2 上进行部署。

部署步骤

1、EMR Serverless Job 提交

以 CLI 的方式创建 EMR Serverless Application

#创建  
aws emr-serverless create-application \  
  --type "SPARK" \  
  --name EMR_Spark_Application_1 \  
  --release-label emr-6.10.0 \  
  --initial-capacity '{  
    "DRIVER": {  
        "workerCount": 2,  
        "workerConfiguration": {  
            "cpu": "2vCPU",  
            "memory": "4GB"  
        }  
    },  
    "EXECUTOR": {  
        "workerCount": 6,  
        "workerConfiguration": {  
            "cpu": "4vCPU",  
            "memory": "8GB"  
        }  
    }  
  }' \  
  --maximum-capacity '{  
    "cpu": "400vCPU",  
    "memory": "1024GB"  
  }'  
  
#返回结果如下,记下 applicationId,后续提交 job 时将用到  
{  
    "applicationId": "00fans8lvkasq22l",  
    "name": "EMR_Spark_Application_1",  
    "arn": "arn:aws:emr-serverless:ap-northeast-1:accountid:/applications/00fans8lvkasq22l"  
}  

或者通过 AWS Console 进行创建

启动 Spark Job,设置 Application logs 存储 S3 路径

aws emr-serverless start-job-run \  
--application-id {上一步创建的 applicationId} \  
--execution-role-arn {执行的 Role Arn} \  
--name {作业名称} \  
--job-driver '{  
    "sparkSubmit": {  
        "entryPoint": "{执行代码 S3 位置}",  
        "entryPointArguments": [{执行参数}],  
        "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=2g --conf spark.driver.cores=1 --conf spark.driver.memory=2g --conf spark.executor.instances=1"  
        }  
    }' \  
--configuration-overrides '{  
    "monitoringConfiguration": {  
        "s3MonitoringConfiguration": {  
            "logUri": "{logs 保存的 S3 路径}"  
        }  
    }  
}'  
  
#返回结果如下,记下 jobRunId,后续将基于 jobRunId 进行日志查看  
{  
    "applicationId": "00fans8lvkasq22l",  
    "jobRunId": "00fanv9ffjc3c82m",  
    "arn": "arn:aws:emr-serverless:ap-northeast-1:accountid:/applications/00fans8lvkasq22l/jobruns/00fanv9ffjc3c82m"  
}  

或者通过 AWS Console 操作

2、Lambda 进行 logs 转存

import json  
import boto3  
import subprocess  
import os  
  
def copy_folder(source_bucket, source_folder, destination_bucket, destination_folder):  
    s3_client = boto3.client('s3')  
  
    # 列出源文件夹中的所有对象  
    response = s3_client.list_objects_v2(Bucket=source_bucket, Prefix=source_folder)  
    print(response)  
  
    if 'Contents' in response:  
        for file in response['Contents']:  
            # 构建源对象键和目标对象键  
            source_object = file['Key']  
            destination_object = os.path.join(destination_folder, os.path.relpath(source_object, source_folder))  

            # 如果对象是文件夹,则递归复制子文件夹  
            if source_object.endswith('/'):  
                copy_folder(source_bucket, source_object, destination_bucket, destination_object)  
            else:  
                # 复制文件  
                copy_source = {'Bucket': source_bucket, 'Key': source_object}  
                s3_client.copy_object(CopySource=copy_source, Bucket=destination_bucket, Key=destination_object)  
        print('文件夹复制完成')  
    else:  
        print('源文件夹为空')  

def lambda_handler(event, context):  
    jobRunId = event['detail']['jobRunId']  
    applicationId = event['detail']['applicationId']  
    jobRunName = event['detail']['jobRunName']  
    state = event['detail']['state']  

    destination_bucket = '{logs 目的地 S3 BuckerName}'  
    destination_folder = '{logs 目的地 S3 paths}'  
    #Spark job Application log 设置存放的 S3 桶名  
    source_bucket = "{logs 源 S3 BuckerName}"   
    #Spark job Application log 设置存放的路径  
    source_folder = '{logs 源 S3 paths }'+applicationId+'/jobs/'+jobRunId+'/sparklogs/'  

    copy_folder(source_bucket, source_folder, destination_bucket, destination_folder)  

    return {  
        'statusCode': 200,  
        'body': json.dumps(jobRunId + '处理完成')  
    }  

3、EventBridge 设置事件规则

创建规则

设置 EMR Serverless job state 事件模式

设置目标为 Lambda,并选择步骤 2 创建的函数

4、EC2 使用 Docker 部署 Spark History Server

启动 EC2

赋予 ECR、S3 权限、安全组开放使用的 18080 端口

{  
    "Version": "2012-10-17",  
    "Statement": [  
        {  
            "Effect": "Allow",  
            "Action": [  
                "ecr:GetAuthorizationToken",  
                "ecr:BatchGetImage",  
                "ecr:GetDownloadUrlForLayer"  
            ],  
            "Resource": "*"  
        },  
        {  
            "Sid": "ExampleStatement01",  
            "Effect": "Allow",  
            "Action": [  
                "s3:Get*",  
                "s3:List*",  
                "s3:Put*"  
            ],  
            "Resource": [  
                "arn:aws:s3:::your-bucket-name/*",  
                "arn:aws:s3:::your-bucket-name"  
            ]  
        }  
    ]  
}

配置 EC2

#安装 docker 并启动  
sudo yum install docker  
sudo systemctl start docker  
sudo usermod -aG docker ec2-user  
sudo su  
su ec2-user  

#安装 git  
sudo yum install git  

#部署 spark-ui  
git clone https://github.com/aws-samples/emr-serverless-samples.git  
cd emr-serverless-samples/utilities/spark-ui/  

#Login to ECR  
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 755674844232.dkr.ecr.us-east-1.amazonaws.com  
docker build -t emr/spark-ui .  
docker images  

#指定 Spark Log S3 路径(Lambda 转存后的路径)  
LOG_DIR={Log 存储路径}  
REGION={ap-northeast-1}  
#启动并查看启动情况  
docker run --rm -itd \  
    -p 18080:18080 \  
    -e SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=$LOG_DIR -Dspark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain" \  
    -e AWS_REGION=$REGION \  
    emr/spark-ui  

docker ps  

5、使用 Spark History Server

使用 EC2 IP+Dorck 启用端口号进行访问


可进一步完善方案

EMR Serverless Application 的创建,Spark Job 的提交,除了以上方式,企业还可以利用现有的大数据调用平台来调度和执行,保存出入参(Job Name / Application ID / Job ID),然后单独封装⼀个前端页面用读取保存的 Job 记录进行展示,还可加入包括权限控制、搜索等功能,最后通过点击 URI 跳转进入 Spark History Server 中 Jobs 信息页面。

结论

本文介绍了使用 EventBridge,以 EMR Serverless Spark Job 的执行状态的变更为事件驱动,调用 Lambda 函数,将 Spark log 进行转存,再通过在 EC2 上使用 Docker 方式部署 Spark History Server,实现对 Spark Logs 的统一管理和运营。

该方案解决了特定场景下分析使用者必须使用 AWS User 账号登录的不便性,并且在管理和使用上提升了体验。企业可以结合自身需求,对 Spark History Server 的使用和管理方式进行合理选择。

本篇作者

王骁

AWS 解决方案架构师,负责基于 AWS 云计算方案架构的咨询和设计,在国内推广 AWS 云平台技术和各种解决方案,具有丰富的企业 IT 架构经验,目前侧重于于大数据领域的研究。