亚马逊AWS官方博客

使用Amazon EMR Serverless Storage简化运维节省成本

一、前言

我们知道Spark作业在运行过程中需要临时存储来保存计算过程中产生的Shuffle数据,具体为每个作业的配置多大的存储空间来保存Shuffle数据,在作业运行之前我们不容易评估,由于可能的数据倾斜我们可能还要为Executor配置更多的存储。由于Shuffle数据的存在,Spark DRA(Dynamic Resource Allocation)在容器中运行时没有External shuffle service(非ON YARN调度,而是在比如k8s/EMR Serverless)并不能很好的工作,因为Shuffle数据可能会被作业的其它的Stage引用,如果释放Executor会造成Shuffle数据拉取失败,Stage重算,虽然Spark在k8s中提供了shuffle tracking的方式让DRA能够工作,但依然很不高效,依赖垃圾回收机制和shuffleTracking.timeout来控制Executor的释放。令人兴奋的时,亚马逊云科技在EMR Serveless 7.12+版本推出了EMR Serverless Storage用户无需再为作业配置磁盘存储空间,EMR Serverless Storage是Remote Shuffle不再和Executor绑定,Spark DRA可以更高效的工作,也就代表着资源能够更快的释放,成本可以得到更好的节省。

本篇内容通过分析EMR Serverless Storage在TPCDS 3TB上的性能对比测试来帮助大家衡量什么类型的作业更适合在EMR Serverless Storage上运行。同时也会提供作业Shuffle数据获取工具和MCP来帮助大家分析现有作业的shuffle数据是否可以适合迁移到EMR Serverless.

二、分析结论和限制

我们先讲对于EMR Serverless Storage的分析结果和使用限制,之后再看详细的分析数据和提供的工具及MCP.

2.1 分析结论

  1. TPCDS 105个SQL, 使用EMR Serverless Storage相比不使用在相同初始资源开启DRA运行,总成本节省15.5%, 时间基本持平。
  2. 105个SQL中10GB-100GB的shuffle数据量的SQL有20个,平均成本节省13.32%,时间节省6.5%。
  3. 100GB~200GB(TPCDS-3TB没有200GB以上shuffle数据量的SQL)的shuffle数据量SQL3个,平均成本节省55.16%,时间节省25.35%。
  4. 10GB以下shuffle数据量的SQL在EMRServerless Storage上并没有明显的性能和成本优势

基于对TPCDS-3TB的测试分析,我们可以了解到,当我们作业的Shuffle数据量在10GB以上,EMR Serverless Storage才能为我们带来成本和性能的优势。如何获取当前在EMR Serverless上运行的作业shuffle数据大小,后边会提供工具和MCP。

2.2 使用限制

  1. 截止当前(2025-12-12)EMR Serverless Storage只在EMR Serverless(且EMR 7.12+)上支持,在EMR on EC2,EMR on EKS上都不支持,未来有计划支持。
  2. EMR Serverless Storage对于每个Job支持的最大中间结果存储是200GB,如果shuffle读写超过200GB作业会报错,提示超过限制。
  3. EMR Serverless Storage不支持配置worker为1或者2vcpus。

详细的限制信息可以参考这里

三、测试过程和结果

3.1 环境信息

EMR Serverless 版本: 7.12.0, 两个application,一个开启serverless storage,一个关闭serverless storage
Architecture: arm64
Region: us-east-1
数据集: TPCDS-3TB
Driver: 4Core,4G memory
Executor: dynamicAllocation.initialExecutors 3,4Core,8G memory
Storage: 对于不开启EMR Serverless Storage的application 没有配置额外的存储,每个Executor使用默认的20GB存储,且是免费的。开启sererless storage的applicatio存储无需配置。

两个application没有配置pre-initialized capacity,105个SQL依次提交执行

3.2 运行结果

  • 根据Shuffle数据量的不同级别,Serverless Storage和Standard(未使用serverless storage)的性能表现有显著差异, 以下表格展示了不同Shuffle级别下的详细对比:
  • 成本-性能综合对比图
  • 不同Shuffle级别的Storage优势分析图表
  • 所有查询成本节省对比

四、分析作业shuffe数据

4.1 EMR Serverless Spark shuffle数据获取

Spark作业的Shuffle数据量可以通过解析Spark event log获取,对于EMR Serverless而言,作业的event log在作业提交时配置s3目录,就可以将event log放到s3上,一般的配置如下,具体可以参看这里

"s3MonitoringConfiguration": {
    "logUri": "s3://xxxx/logs/spark-event-log"
},

有了event log可以通过如下代码解析出Spark Job Shuffle数据量

#!/usr/bin/env python3
"""
Spark Event Log Shuffle Analyzer

Analyzes Spark event logs from S3 to extract shuffle read/write metrics.
"""

import argparse
import json
import boto3
import csv
import threading
import logging
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict


def setup_logging():
    """Setup logging configuration."""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    return logging.getLogger(__name__)


def parse_args():
    parser = argparse.ArgumentParser(
        description="Analyze Spark event logs for shuffle metrics",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Example:
  python analyze_spark_shuffle.py \\
    --event-log-base-path s3://xxxxx/spark-event-log/ \\
    --application-id 00g1m3ocn5clib09 \\
    --job-ids 00g1obkrn61ab00b,00g1obkrn61ab00c \\
    --threads 4 \\
    --output results.csv
        """
    )

    parser.add_argument(
        "--event-log-base-path",
        required=True,
        help="Base S3 path for event logs (e.g., s3://bucket/spark-event-log/)"
    )

    parser.add_argument(
        "--application-id",
        required=True,
        help="Spark application ID"
    )

    parser.add_argument(
        "--job-ids",
        required=True,
        help="Comma-separated list of Spark job IDs"
    )

    parser.add_argument(
        "--threads",
        type=int,
        default=10,
        help="Number of threads for parallel processing (default: 4)"
    )

    parser.add_argument(
        "--output",
        help="Output CSV file path (default: {application_id}_shuffle.csv)"
    )

    return parser.parse_args()


def list_s3_files(s3_path):
    """List files in S3 path, excluding appstatus files."""
    s3 = boto3.client('s3')

    if not s3_path.startswith('s3://'):
        raise ValueError("S3 path must start with s3://")

    path_parts = s3_path[5:].split('/', 1)
    bucket = path_parts[0]
    prefix = path_parts[1] if len(path_parts) > 1 else ""

    files = []
    paginator = s3.get_paginator('list_objects_v2')

    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                filename = key.split('/')[-1]
                if not filename.startswith('appstatus'):
                    files.append(f"s3://{bucket}/{key}")

    return files


def read_s3_file(s3_path):
    """Read content from S3 file."""
    s3 = boto3.client('s3')

    path_parts = s3_path[5:].split('/', 1)
    bucket = path_parts[0]
    key = path_parts[1]

    response = s3.get_object(Bucket=bucket, Key=key)
    return response['Body'].read().decode('utf-8')


def parse_event_log(content):
    """Parse event log content and extract shuffle metrics."""
    shuffle_read_bytes = 0
    shuffle_write_bytes = 0
    shuffle_read_records = 0
    shuffle_write_records = 0

    for line in content.strip().split('\n'):
        if not line.strip():
            continue

        try:
            event = json.loads(line)
            event_type = event.get('Event')

            if event_type == 'SparkListenerTaskEnd':
                task_metrics = event.get('Task Metrics', {})

                # Shuffle read metrics
                shuffle_read = task_metrics.get('Shuffle Read Metrics', {})
                if shuffle_read:
                    remote_bytes = shuffle_read.get('Remote Bytes Read', 0)
                    local_bytes = shuffle_read.get('Local Bytes Read', 0)
                    shuffle_read_bytes += remote_bytes + local_bytes
                    shuffle_read_records += shuffle_read.get('Total Records Read', 0)

                # Shuffle write metrics
                shuffle_write = task_metrics.get('Shuffle Write Metrics', {})
                if shuffle_write:
                    shuffle_write_bytes += shuffle_write.get('Shuffle Bytes Written', 0)
                    shuffle_write_records += shuffle_write.get('Shuffle Records Written', 0)

        except json.JSONDecodeError:
            continue

    return {
        'shuffle_read_bytes': shuffle_read_bytes,
        'shuffle_write_bytes': shuffle_write_bytes,
        'shuffle_read_records': shuffle_read_records,
        'shuffle_write_records': shuffle_write_records
    }


def process_job(application_id, job_id, event_log_base_path, logger):
    """Process a single job and return metrics."""
    try:
        event_log_path = f"{event_log_base_path.rstrip('/')}/applications/{application_id}/jobs/{job_id}/sparklogs/eventlog_v2_{job_id}/"

        logger.info(f"Processing job {job_id}...")

        files = list_s3_files(event_log_path)

        total_shuffle_read_bytes = 0
        total_shuffle_write_bytes = 0
        total_shuffle_read_records = 0
        total_shuffle_write_records = 0

        for file_path in files:
            content = read_s3_file(file_path)
            metrics = parse_event_log(content)

            total_shuffle_read_bytes += metrics['shuffle_read_bytes']
            total_shuffle_write_bytes += metrics['shuffle_write_bytes']
            total_shuffle_read_records += metrics['shuffle_read_records']
            total_shuffle_write_records += metrics['shuffle_write_records']

        return {
            'application_id': application_id,
            'job_id': job_id,
            'shuffle_read_bytes': total_shuffle_read_bytes,
            'shuffle_write_bytes': total_shuffle_write_bytes,
            'shuffle_read_records': total_shuffle_read_records,
            'shuffle_write_records': total_shuffle_write_records,
            'shuffle_total_bytes': total_shuffle_read_bytes + total_shuffle_write_bytes,
            'shuffle_total_records': total_shuffle_read_records + total_shuffle_write_records
        }

    except Exception as e:
        logger.error(f"Error processing job {job_id}: {e}")
        return {
            'application_id': application_id,
            'job_id': job_id,
            'shuffle_read_bytes': 0,
            'shuffle_write_bytes': 0,
            'shuffle_read_records': 0,
            'shuffle_write_records': 0,
            'shuffle_total_bytes': 0,
            'shuffle_total_records': 0
        }


def main():
    logger = setup_logging()
    args = parse_args()

    # Set default output filename if not provided
    if not args.output:
        args.output = f"{args.application_id}_shuffle.csv"

    job_ids = [job_id.strip() for job_id in args.job_ids.split(',')]

    logger.info(f"Processing {len(job_ids)} jobs with {args.threads} threads...")

    results = []

    with ThreadPoolExecutor(max_workers=args.threads) as executor:
        futures = [
            executor.submit(process_job, args.application_id, job_id, args.event_log_base_path, logger)
            for job_id in job_ids
        ]

        for future in futures:
            results.append(future.result())

    # Write to CSV
    fieldnames = [
        'application_id', 'job_id', 'shuffle_read_bytes', 'shuffle_write_bytes',
        'shuffle_read_records', 'shuffle_write_records', 'shuffle_total_bytes', 'shuffle_total_records'
    ]

    with open(args.output, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results)

    logger.info(f"Results written to {args.output}")

    # Log summary
    logger.info("Summary:")
    for result in results:
        logger.info(f"Job {result['job_id']}: "
                   f"Read {result['shuffle_read_bytes']:,} bytes, "
                   f"Write {result['shuffle_write_bytes']:,} bytes, "
                   f"Total {result['shuffle_total_bytes']:,} bytes")


if __name__ == "__main__":
    main()
# 上边代码保存为analyze_spark_shuffle.py
# --application-id 是emr serverless的application id, --job-ids 是applicaiton中的job id,可以指定多个,多个可以并行解析
uv run python analyze_spark_shuffle.py \
    --event-log-base-path s3://xxxxx/spark-event-log/ \
    --application-id xxxxx \
    --job-ids xxxx,xxxx \
    --threads 5
有了作业的Shuffle数据,就可以根据数据结合上文的分析判断,当前作业是否适合迁移到EMR Serverless.

4.2 Spark Event Log MCP

如果我们想对Spark作业的性能和Shuffle数据做更全面的分析,这里开发了一个Event Log的MCP,直接添加到咱们的AI工具里面比如kiro-cli,就可以帮助我们做自动的分析。使用方式如下,项目地址点击这里

  • 配置MCP
{
  "mcpServers": {
    "spark-eventlog": {
      "type": "stdio",
      "command": "uvx",
      "args": [
        "--from",
        "git+https://github.com/yhyyz/spark-eventlog-mcp",
        "spark-eventlog-mcp"
      ],
      "env": {
        "MCP_TRANSPORT": "stdio"
      }
    }
  }
}
  • 使用方式如下,只要告诉kiro event log路径,kiro就可以使用我们的mcp,来分析了,分析之后的结果会生成一个html的报告。如果mcp使用studio模式,报告就在本地机器上。如果mcp配置使用http模式,点击http链接查看即可。 下面是一个分析的例子,第一个分析报告是emr on eks spark作业分析报告部分截图。第二个分析报告是emr serverless的作业的分析报告。对于emr serverless作业的spark event log日志就在你配制的路径下比如:
     

    bucket_name=xxx
    application_id=xxxx
    job_id=xxxx
    echo "s3://${bucket_name}/spark-event-log/applications/${application_id}/jobs/${job_id}/sparklogs/eventlog_v2_${job_id}/"
  • 分析报告的结果

五、总结

本篇内容通过对EMR Serverless Storage在TPCDS 3TB基准测试上的深入分析,我们能更好地了解EMR Serverless Storage的使用场景,适合shuffle数据量超过10GB的作业,数据量越大的Shuffle密集型作业,收益越明显,对于小于10GB Shuffle数据的作业,传统存储方式可能更经济。当然这都是通过TPCDS的数据的分析而来,建议大家作为一个参考,可以在自己的作业上根据Shuffle数据量做相关测试。EMR Serverless Spark作业Shuffle数据量的获取方式可以根据本文提供的方式快速获取。如果想要对任何的Spark Event Log日志做分析可以通过本文的MCP Server获取更为全面的分析报告。

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

潘超

亚马逊云科技数据分析解决方案架构师。负责客户大数据解决方案的咨询与架构设计,在开源大数据方面拥有丰富的经验。工作之外喜欢爬山。

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用